1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "common/errno.h"
15 #include "ReplicatedBackend.h"
16 #include "messages/MOSDOp.h"
17 #include "messages/MOSDRepOp.h"
18 #include "messages/MOSDRepOpReply.h"
19 #include "messages/MOSDPGPush.h"
20 #include "messages/MOSDPGPull.h"
21 #include "messages/MOSDPGPushReply.h"
22 #include "common/EventTrace.h"
23 #include "include/random.h"
24 #include "include/util.h"
27 #define dout_context cct
28 #define dout_subsys ceph_subsys_osd
29 #define DOUT_PREFIX_ARGS this
31 #define dout_prefix _prefix(_dout, this)
32 static ostream
& _prefix(std::ostream
*_dout
, ReplicatedBackend
*pgb
) {
33 return pgb
->get_parent()->gen_dbg_prefix(*_dout
);
37 class PG_SendMessageOnConn
: public Context
{
38 PGBackend::Listener
*pg
;
43 PGBackend::Listener
*pg
,
45 ConnectionRef conn
) : pg(pg
), reply(reply
), conn(conn
) {}
46 void finish(int) override
{
47 pg
->send_message_osd_cluster(reply
, conn
.get());
51 class PG_RecoveryQueueAsync
: public Context
{
52 PGBackend::Listener
*pg
;
53 unique_ptr
<GenContext
<ThreadPool::TPHandle
&>> c
;
55 PG_RecoveryQueueAsync(
56 PGBackend::Listener
*pg
,
57 GenContext
<ThreadPool::TPHandle
&> *c
) : pg(pg
), c(c
) {}
58 void finish(int) override
{
59 pg
->schedule_recovery_work(c
.release());
64 struct ReplicatedBackend::C_OSD_RepModifyCommit
: public Context
{
65 ReplicatedBackend
*pg
;
67 C_OSD_RepModifyCommit(ReplicatedBackend
*pg
, RepModifyRef r
)
69 void finish(int r
) override
{
74 static void log_subop_stats(
76 OpRequestRef op
, int subop
)
78 utime_t now
= ceph_clock_now();
79 utime_t latency
= now
;
80 latency
-= op
->get_req()->get_recv_stamp();
83 logger
->inc(l_osd_sop
);
84 logger
->tinc(l_osd_sop_lat
, latency
);
87 if (subop
!= l_osd_sop_pull
) {
88 uint64_t inb
= op
->get_req()->get_data().length();
89 logger
->inc(l_osd_sop_inb
, inb
);
90 if (subop
== l_osd_sop_w
) {
91 logger
->inc(l_osd_sop_w_inb
, inb
);
92 logger
->tinc(l_osd_sop_w_lat
, latency
);
93 } else if (subop
== l_osd_sop_push
) {
94 logger
->inc(l_osd_sop_push_inb
, inb
);
95 logger
->tinc(l_osd_sop_push_lat
, latency
);
97 ceph_abort_msg("no support subop");
99 logger
->tinc(l_osd_sop_pull_lat
, latency
);
103 ReplicatedBackend::ReplicatedBackend(
104 PGBackend::Listener
*pg
,
106 ObjectStore::CollectionHandle
&c
,
109 PGBackend(cct
, pg
, store
, coll
, c
) {}
111 void ReplicatedBackend::run_recovery_op(
112 PGBackend::RecoveryHandle
*_h
,
115 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
116 send_pushes(priority
, h
->pushes
);
117 send_pulls(priority
, h
->pulls
);
118 send_recovery_deletes(priority
, h
->deletes
);
122 int ReplicatedBackend::recover_object(
123 const hobject_t
&hoid
,
125 ObjectContextRef head
,
126 ObjectContextRef obc
,
130 dout(10) << __func__
<< ": " << hoid
<< dendl
;
131 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
132 if (get_parent()->get_local_missing().is_missing(hoid
)) {
142 int started
= start_pushes(
147 pushing
[hoid
].clear();
154 void ReplicatedBackend::check_recovery_sources(const OSDMapRef
& osdmap
)
156 for(map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= pull_from_peer
.begin();
157 i
!= pull_from_peer
.end();
159 if (osdmap
->is_down(i
->first
.osd
)) {
160 dout(10) << "check_recovery_sources resetting pulls from osd." << i
->first
161 << ", osdmap has it marked down" << dendl
;
162 for (set
<hobject_t
>::iterator j
= i
->second
.begin();
163 j
!= i
->second
.end();
165 get_parent()->cancel_pull(*j
);
166 clear_pull(pulling
.find(*j
), false);
168 pull_from_peer
.erase(i
++);
175 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op
)
177 dout(10) << __func__
<< ": " << op
<< dendl
;
178 switch (op
->get_req()->get_type()) {
179 case MSG_OSD_PG_PULL
:
186 bool ReplicatedBackend::_handle_message(
190 dout(10) << __func__
<< ": " << op
<< dendl
;
191 switch (op
->get_req()->get_type()) {
192 case MSG_OSD_PG_PUSH
:
196 case MSG_OSD_PG_PULL
:
200 case MSG_OSD_PG_PUSH_REPLY
:
204 case MSG_OSD_REPOP
: {
209 case MSG_OSD_REPOPREPLY
: {
220 void ReplicatedBackend::clear_recovery_state()
222 // clear pushing/pulling maps
223 for (auto &&i
: pushing
) {
224 for (auto &&j
: i
.second
) {
225 get_parent()->release_locks(j
.second
.lock_manager
);
230 for (auto &&i
: pulling
) {
231 get_parent()->release_locks(i
.second
.lock_manager
);
234 pull_from_peer
.clear();
237 void ReplicatedBackend::on_change()
239 dout(10) << __func__
<< dendl
;
240 for (auto& op
: in_progress_ops
) {
241 delete op
.second
->on_commit
;
242 op
.second
->on_commit
= nullptr;
244 in_progress_ops
.clear();
245 clear_recovery_state();
248 int ReplicatedBackend::objects_read_sync(
249 const hobject_t
&hoid
,
255 return store
->read(ch
, ghobject_t(hoid
), off
, len
, *bl
, op_flags
);
258 int ReplicatedBackend::objects_readv_sync(
259 const hobject_t
&hoid
,
260 map
<uint64_t, uint64_t>&& m
,
264 interval_set
<uint64_t> im(std::move(m
));
265 auto r
= store
->readv(ch
, ghobject_t(hoid
), im
, *bl
, op_flags
);
267 m
= std::move(im
).detach();
272 void ReplicatedBackend::objects_read_async(
273 const hobject_t
&hoid
,
274 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
275 pair
<bufferlist
*, Context
*> > > &to_read
,
276 Context
*on_complete
,
279 ceph_abort_msg("async read is not used by replica pool");
282 class C_OSD_OnOpCommit
: public Context
{
283 ReplicatedBackend
*pg
;
284 ceph::ref_t
<ReplicatedBackend::InProgressOp
> op
;
286 C_OSD_OnOpCommit(ReplicatedBackend
*pg
, ceph::ref_t
<ReplicatedBackend::InProgressOp
> op
)
287 : pg(pg
), op(std::move(op
)) {}
288 void finish(int) override
{
293 void generate_transaction(
294 PGTransactionUPtr
&pgt
,
296 vector
<pg_log_entry_t
> &log_entries
,
297 ObjectStore::Transaction
*t
,
298 set
<hobject_t
> *added
,
299 set
<hobject_t
> *removed
,
300 const ceph_release_t require_osd_release
= ceph_release_t::unknown
)
304 ceph_assert(removed
);
306 for (auto &&le
: log_entries
) {
307 le
.mark_unrollbackable();
308 auto oiter
= pgt
->op_map
.find(le
.soid
);
309 if (oiter
!= pgt
->op_map
.end() && oiter
->second
.updated_snaps
) {
310 bufferlist
bl(oiter
->second
.updated_snaps
->second
.size() * 8 + 8);
311 encode(oiter
->second
.updated_snaps
->second
, bl
);
313 le
.snaps
.reassign_to_mempool(mempool::mempool_osd_pglog
);
317 pgt
->safe_create_traverse(
318 [&](pair
<const hobject_t
, PGTransaction::ObjectOperation
> &obj_op
) {
319 const hobject_t
&oid
= obj_op
.first
;
320 const ghobject_t goid
=
321 ghobject_t(oid
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
);
322 const PGTransaction::ObjectOperation
&op
= obj_op
.second
;
325 if (op
.is_fresh_object()) {
327 } else if (op
.is_delete()) {
328 removed
->insert(oid
);
332 if (op
.delete_first
) {
333 t
->remove(coll
, goid
);
338 [&](const PGTransaction::ObjectOperation::Init::None
&) {
340 [&](const PGTransaction::ObjectOperation::Init::Create
&op
) {
341 if (require_osd_release
>= ceph_release_t::octopus
) {
342 t
->create(coll
, goid
);
344 t
->touch(coll
, goid
);
347 [&](const PGTransaction::ObjectOperation::Init::Clone
&op
) {
351 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
354 [&](const PGTransaction::ObjectOperation::Init::Rename
&op
) {
355 ceph_assert(op
.source
.is_temp());
356 t
->collection_move_rename(
359 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
365 t
->truncate(coll
, goid
, op
.truncate
->first
);
366 if (op
.truncate
->first
!= op
.truncate
->second
)
367 t
->truncate(coll
, goid
, op
.truncate
->second
);
370 if (!op
.attr_updates
.empty()) {
371 map
<string
, bufferlist
> attrs
;
372 for (auto &&p
: op
.attr_updates
) {
374 attrs
[p
.first
] = *(p
.second
);
376 t
->rmattr(coll
, goid
, p
.first
);
378 t
->setattrs(coll
, goid
, attrs
);
382 t
->omap_clear(coll
, goid
);
384 t
->omap_setheader(coll
, goid
, *(op
.omap_header
));
386 for (auto &&up
: op
.omap_updates
) {
387 using UpdateType
= PGTransaction::ObjectOperation::OmapUpdateType
;
389 case UpdateType::Remove
:
390 t
->omap_rmkeys(coll
, goid
, up
.second
);
392 case UpdateType::Insert
:
393 t
->omap_setkeys(coll
, goid
, up
.second
);
395 case UpdateType::RemoveRange
:
396 t
->omap_rmkeyrange(coll
, goid
, up
.second
);
401 // updated_snaps doesn't matter since we marked unrollbackable
404 auto &hint
= *(op
.alloc_hint
);
408 hint
.expected_object_size
,
409 hint
.expected_write_size
,
413 for (auto &&extent
: op
.buffer_updates
) {
414 using BufferUpdate
= PGTransaction::ObjectOperation::BufferUpdate
;
417 [&](const BufferUpdate::Write
&op
) {
426 [&](const BufferUpdate::Zero
&op
) {
433 [&](const BufferUpdate::CloneRange
&op
) {
434 ceph_assert(op
.len
== extent
.get_len());
437 ghobject_t(op
.from
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
447 void ReplicatedBackend::submit_transaction(
448 const hobject_t
&soid
,
449 const object_stat_sum_t
&delta_stats
,
450 const eversion_t
&at_version
,
451 PGTransactionUPtr
&&_t
,
452 const eversion_t
&trim_to
,
453 const eversion_t
&min_last_complete_ondisk
,
454 const vector
<pg_log_entry_t
> &_log_entries
,
455 std::optional
<pg_hit_set_history_t
> &hset_history
,
456 Context
*on_all_commit
,
459 OpRequestRef orig_op
)
465 vector
<pg_log_entry_t
> log_entries(_log_entries
);
466 ObjectStore::Transaction op_t
;
467 PGTransactionUPtr
t(std::move(_t
));
468 set
<hobject_t
> added
, removed
;
469 generate_transaction(
476 get_osdmap()->require_osd_release
);
477 ceph_assert(added
.size() <= 1);
478 ceph_assert(removed
.size() <= 1);
480 auto insert_res
= in_progress_ops
.insert(
483 ceph::make_ref
<InProgressOp
>(
488 ceph_assert(insert_res
.second
);
489 InProgressOp
&op
= *insert_res
.first
->second
;
491 op
.waiting_for_commit
.insert(
492 parent
->get_acting_recovery_backfill_shards().begin(),
493 parent
->get_acting_recovery_backfill_shards().end());
501 min_last_complete_ondisk
,
502 added
.size() ? *(added
.begin()) : hobject_t(),
503 removed
.size() ? *(removed
.begin()) : hobject_t(),
509 add_temp_objs(added
);
510 clear_temp_objs(removed
);
512 parent
->log_operation(
517 min_last_complete_ondisk
,
521 op_t
.register_on_commit(
522 parent
->bless_context(
523 new C_OSD_OnOpCommit(this, &op
)));
525 vector
<ObjectStore::Transaction
> tls
;
526 tls
.push_back(std::move(op_t
));
528 parent
->queue_transactions(tls
, op
.op
);
529 if (at_version
!= eversion_t()) {
530 parent
->op_applied(at_version
);
534 void ReplicatedBackend::op_commit(const ceph::ref_t
<InProgressOp
>& op
)
536 if (op
->on_commit
== nullptr) {
542 OID_EVENT_TRACE_WITH_MSG((op
&& op
->op
) ? op
->op
->get_req() : NULL
, "OP_COMMIT_BEGIN", true);
543 dout(10) << __func__
<< ": " << op
->tid
<< dendl
;
545 op
->op
->mark_event("op_commit");
546 op
->op
->pg_trace
.event("op commit");
549 op
->waiting_for_commit
.erase(get_parent()->whoami_shard());
551 if (op
->waiting_for_commit
.empty()) {
552 op
->on_commit
->complete(0);
554 in_progress_ops
.erase(op
->tid
);
558 void ReplicatedBackend::do_repop_reply(OpRequestRef op
)
560 static_cast<MOSDRepOpReply
*>(op
->get_nonconst_req())->finish_decode();
561 auto r
= op
->get_req
<MOSDRepOpReply
>();
562 ceph_assert(r
->get_header().type
== MSG_OSD_REPOPREPLY
);
566 // must be replication.
567 ceph_tid_t rep_tid
= r
->get_tid();
568 pg_shard_t from
= r
->from
;
570 auto iter
= in_progress_ops
.find(rep_tid
);
571 if (iter
!= in_progress_ops
.end()) {
572 InProgressOp
&ip_op
= *iter
->second
;
573 const MOSDOp
*m
= nullptr;
575 m
= ip_op
.op
->get_req
<MOSDOp
>();
578 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " op " //<< *m
579 << " ack_type " << (int)r
->ack_type
583 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " (no op) "
584 << " ack_type " << (int)r
->ack_type
590 if (r
->ack_type
& CEPH_OSD_FLAG_ONDISK
) {
591 ceph_assert(ip_op
.waiting_for_commit
.count(from
));
592 ip_op
.waiting_for_commit
.erase(from
);
594 ip_op
.op
->mark_event("sub_op_commit_rec");
595 ip_op
.op
->pg_trace
.event("sub_op_commit_rec");
598 // legacy peer; ignore
601 parent
->update_peer_last_complete_ondisk(
603 r
->get_last_complete_ondisk());
605 if (ip_op
.waiting_for_commit
.empty() &&
607 ip_op
.on_commit
->complete(0);
609 in_progress_ops
.erase(iter
);
614 int ReplicatedBackend::be_deep_scrub(
615 const hobject_t
&poid
,
617 ScrubMapBuilder
&pos
,
620 dout(10) << __func__
<< " " << poid
<< " pos " << pos
<< dendl
;
622 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
|
623 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
|
624 CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE
;
627 sleeptime
.set_from_double(cct
->_conf
->osd_debug_deep_scrub_sleep
);
628 if (sleeptime
!= utime_t()) {
629 lgeneric_derr(cct
) << __func__
<< " sleeping for " << sleeptime
<< dendl
;
633 ceph_assert(poid
== pos
.ls
[pos
.pos
]);
634 if (!pos
.data_done()) {
635 if (pos
.data_pos
== 0) {
636 pos
.data_hash
= bufferhash(-1);
643 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
645 cct
->_conf
->osd_deep_scrub_stride
, bl
,
648 dout(20) << __func__
<< " " << poid
<< " got "
649 << r
<< " on read, read_error" << dendl
;
657 if (r
== cct
->_conf
->osd_deep_scrub_stride
) {
658 dout(20) << __func__
<< " " << poid
<< " more data, digest so far 0x"
659 << std::hex
<< pos
.data_hash
.digest() << std::dec
<< dendl
;
664 o
.digest
= pos
.data_hash
.digest();
665 o
.digest_present
= true;
666 dout(20) << __func__
<< " " << poid
<< " done with data, digest 0x"
667 << std::hex
<< o
.digest
<< std::dec
<< dendl
;
671 if (pos
.omap_pos
.empty()) {
672 pos
.omap_hash
= bufferhash(-1);
675 r
= store
->omap_get_header(
678 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
681 dout(20) << __func__
<< " " << poid
<< " got "
682 << r
<< " on omap header read, read_error" << dendl
;
686 if (r
== 0 && hdrbl
.length()) {
687 bool encoded
= false;
688 dout(25) << "CRC header " << cleanbin(hdrbl
, encoded
, true) << dendl
;
689 pos
.omap_hash
<< hdrbl
;
694 ObjectMap::ObjectMapIterator iter
= store
->get_omap_iterator(
697 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
699 if (pos
.omap_pos
.length()) {
700 iter
->lower_bound(pos
.omap_pos
);
702 iter
->seek_to_first();
704 int max
= g_conf()->osd_deep_scrub_keys
;
705 while (iter
->status() == 0 && iter
->valid()) {
706 pos
.omap_bytes
+= iter
->value().length();
709 // fixme: we can do this more efficiently.
711 encode(iter
->key(), bl
);
712 encode(iter
->value(), bl
);
717 if (iter
->valid() && max
== 0) {
718 pos
.omap_pos
= iter
->key();
721 if (iter
->status() < 0) {
722 dout(25) << __func__
<< " " << poid
723 << " on omap scan, db status error" << dendl
;
729 if (pos
.omap_keys
> cct
->_conf
->
730 osd_deep_scrub_large_omap_object_key_threshold
||
731 pos
.omap_bytes
> cct
->_conf
->
732 osd_deep_scrub_large_omap_object_value_sum_threshold
) {
733 dout(25) << __func__
<< " " << poid
734 << " large omap object detected. Object has " << pos
.omap_keys
735 << " keys and size " << pos
.omap_bytes
<< " bytes" << dendl
;
736 o
.large_omap_object_found
= true;
737 o
.large_omap_object_key_count
= pos
.omap_keys
;
738 o
.large_omap_object_value_size
= pos
.omap_bytes
;
739 map
.has_large_omap_object_errors
= true;
742 o
.omap_digest
= pos
.omap_hash
.digest();
743 o
.omap_digest_present
= true;
744 dout(20) << __func__
<< " done with " << poid
<< " omap_digest "
745 << std::hex
<< o
.omap_digest
<< std::dec
<< dendl
;
748 if (pos
.omap_keys
> 0 || pos
.omap_bytes
> 0) {
749 dout(25) << __func__
<< " adding " << pos
.omap_keys
<< " keys and "
750 << pos
.omap_bytes
<< " bytes to pg_stats sums" << dendl
;
751 map
.has_omap_keys
= true;
752 o
.object_omap_bytes
= pos
.omap_bytes
;
753 o
.object_omap_keys
= pos
.omap_keys
;
760 void ReplicatedBackend::_do_push(OpRequestRef op
)
762 auto m
= op
->get_req
<MOSDPGPush
>();
763 ceph_assert(m
->get_type() == MSG_OSD_PG_PUSH
);
764 pg_shard_t from
= m
->from
;
768 vector
<PushReplyOp
> replies
;
769 ObjectStore::Transaction t
;
770 if (get_parent()->check_failsafe_full()) {
771 dout(10) << __func__
<< " Out of space (failsafe) processing push request." << dendl
;
774 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
775 i
!= m
->pushes
.end();
777 replies
.push_back(PushReplyOp());
778 handle_push(from
, *i
, &(replies
.back()), &t
, m
->is_repair
);
781 MOSDPGPushReply
*reply
= new MOSDPGPushReply
;
782 reply
->from
= get_parent()->whoami_shard();
783 reply
->set_priority(m
->get_priority());
784 reply
->pgid
= get_info().pgid
;
785 reply
->map_epoch
= m
->map_epoch
;
786 reply
->min_epoch
= m
->min_epoch
;
787 reply
->replies
.swap(replies
);
788 reply
->compute_cost(cct
);
790 t
.register_on_complete(
791 new PG_SendMessageOnConn(
792 get_parent(), reply
, m
->get_connection()));
794 get_parent()->queue_transaction(std::move(t
));
797 struct C_ReplicatedBackend_OnPullComplete
: GenContext
<ThreadPool::TPHandle
&> {
798 ReplicatedBackend
*bc
;
799 list
<ReplicatedBackend::pull_complete_info
> to_continue
;
801 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend
*bc
, int priority
)
802 : bc(bc
), priority(priority
) {}
804 void finish(ThreadPool::TPHandle
&handle
) override
{
805 ReplicatedBackend::RPGHandle
*h
= bc
->_open_recovery_op();
806 for (auto &&i
: to_continue
) {
807 auto j
= bc
->pulling
.find(i
.hoid
);
808 ceph_assert(j
!= bc
->pulling
.end());
809 ObjectContextRef obc
= j
->second
.obc
;
810 bc
->clear_pull(j
, false /* already did it */);
811 int started
= bc
->start_pushes(i
.hoid
, obc
, h
);
813 bc
->pushing
[i
.hoid
].clear();
814 bc
->get_parent()->on_failed_pull(
815 { bc
->get_parent()->whoami_shard() },
816 i
.hoid
, obc
->obs
.oi
.version
);
817 } else if (!started
) {
818 bc
->get_parent()->on_global_recover(
819 i
.hoid
, i
.stat
, false);
821 handle
.reset_tp_timeout();
823 bc
->run_recovery_op(h
, priority
);
827 void ReplicatedBackend::_do_pull_response(OpRequestRef op
)
829 auto m
= op
->get_req
<MOSDPGPush
>();
830 ceph_assert(m
->get_type() == MSG_OSD_PG_PUSH
);
831 pg_shard_t from
= m
->from
;
835 vector
<PullOp
> replies(1);
836 if (get_parent()->check_failsafe_full()) {
837 dout(10) << __func__
<< " Out of space (failsafe) processing pull response (push)." << dendl
;
841 ObjectStore::Transaction t
;
842 list
<pull_complete_info
> to_continue
;
843 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
844 i
!= m
->pushes
.end();
846 bool more
= handle_pull_response(from
, *i
, &(replies
.back()), &to_continue
, &t
);
848 replies
.push_back(PullOp());
850 if (!to_continue
.empty()) {
851 C_ReplicatedBackend_OnPullComplete
*c
=
852 new C_ReplicatedBackend_OnPullComplete(
855 c
->to_continue
.swap(to_continue
);
856 t
.register_on_complete(
857 new PG_RecoveryQueueAsync(
859 get_parent()->bless_unlocked_gencontext(c
)));
861 replies
.erase(replies
.end() - 1);
863 if (replies
.size()) {
864 MOSDPGPull
*reply
= new MOSDPGPull
;
865 reply
->from
= parent
->whoami_shard();
866 reply
->set_priority(m
->get_priority());
867 reply
->pgid
= get_info().pgid
;
868 reply
->map_epoch
= m
->map_epoch
;
869 reply
->min_epoch
= m
->min_epoch
;
870 reply
->set_pulls(&replies
);
871 reply
->compute_cost(cct
);
873 t
.register_on_complete(
874 new PG_SendMessageOnConn(
875 get_parent(), reply
, m
->get_connection()));
878 get_parent()->queue_transaction(std::move(t
));
881 void ReplicatedBackend::do_pull(OpRequestRef op
)
883 MOSDPGPull
*m
= static_cast<MOSDPGPull
*>(op
->get_nonconst_req());
884 ceph_assert(m
->get_type() == MSG_OSD_PG_PULL
);
885 pg_shard_t from
= m
->from
;
887 map
<pg_shard_t
, vector
<PushOp
> > replies
;
888 vector
<PullOp
> pulls
;
889 m
->take_pulls(&pulls
);
890 for (auto& i
: pulls
) {
891 replies
[from
].push_back(PushOp());
892 handle_pull(from
, i
, &(replies
[from
].back()));
894 send_pushes(m
->get_priority(), replies
);
897 void ReplicatedBackend::do_push_reply(OpRequestRef op
)
899 auto m
= op
->get_req
<MOSDPGPushReply
>();
900 ceph_assert(m
->get_type() == MSG_OSD_PG_PUSH_REPLY
);
901 pg_shard_t from
= m
->from
;
903 vector
<PushOp
> replies(1);
904 for (vector
<PushReplyOp
>::const_iterator i
= m
->replies
.begin();
905 i
!= m
->replies
.end();
907 bool more
= handle_push_reply(from
, *i
, &(replies
.back()));
909 replies
.push_back(PushOp());
911 replies
.erase(replies
.end() - 1);
913 map
<pg_shard_t
, vector
<PushOp
> > _replies
;
914 _replies
[from
].swap(replies
);
915 send_pushes(m
->get_priority(), _replies
);
918 Message
* ReplicatedBackend::generate_subop(
919 const hobject_t
&soid
,
920 const eversion_t
&at_version
,
923 eversion_t pg_trim_to
,
924 eversion_t min_last_complete_ondisk
,
925 hobject_t new_temp_oid
,
926 hobject_t discard_temp_oid
,
927 const bufferlist
&log_entries
,
928 std::optional
<pg_hit_set_history_t
> &hset_hist
,
929 ObjectStore::Transaction
&op_t
,
931 const pg_info_t
&pinfo
)
933 int acks_wanted
= CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
;
934 // forward the write/update/whatever
935 MOSDRepOp
*wr
= new MOSDRepOp(
936 reqid
, parent
->whoami_shard(),
937 spg_t(get_info().pgid
.pgid
, peer
.shard
),
940 parent
->get_last_peering_reset_epoch(),
943 // ship resulting transaction, log entries, and pg_stats
944 if (!parent
->should_send_op(peer
, soid
)) {
945 ObjectStore::Transaction t
;
946 encode(t
, wr
->get_data());
948 encode(op_t
, wr
->get_data());
949 wr
->get_header().data_off
= op_t
.get_data_alignment();
952 wr
->logbl
= log_entries
;
954 if (pinfo
.is_incomplete())
955 wr
->pg_stats
= pinfo
.stats
; // reflects backfill progress
957 wr
->pg_stats
= get_info().stats
;
959 wr
->pg_trim_to
= pg_trim_to
;
961 if (HAVE_FEATURE(parent
->min_peer_features(), OSD_REPOP_MLCOD
)) {
962 wr
->min_last_complete_ondisk
= min_last_complete_ondisk
;
964 /* Some replicas need this field to be at_version. New replicas
966 wr
->set_rollback_to(at_version
);
969 wr
->new_temp_oid
= new_temp_oid
;
970 wr
->discard_temp_oid
= discard_temp_oid
;
971 wr
->updated_hit_set_history
= hset_hist
;
975 void ReplicatedBackend::issue_op(
976 const hobject_t
&soid
,
977 const eversion_t
&at_version
,
980 eversion_t pg_trim_to
,
981 eversion_t min_last_complete_ondisk
,
982 hobject_t new_temp_oid
,
983 hobject_t discard_temp_oid
,
984 const vector
<pg_log_entry_t
> &log_entries
,
985 std::optional
<pg_hit_set_history_t
> &hset_hist
,
987 ObjectStore::Transaction
&op_t
)
989 if (parent
->get_acting_recovery_backfill_shards().size() > 1) {
991 op
->op
->pg_trace
.event("issue replication ops");
993 set
<pg_shard_t
> replicas
= parent
->get_acting_recovery_backfill_shards();
994 replicas
.erase(parent
->whoami_shard());
995 ss
<< "waiting for subops from " << replicas
;
996 op
->op
->mark_sub_op_sent(ss
.str());
999 // avoid doing the same work in generate_subop
1001 encode(log_entries
, logs
);
1003 for (const auto& shard
: get_parent()->get_acting_recovery_backfill_shards()) {
1004 if (shard
== parent
->whoami_shard()) continue;
1005 const pg_info_t
&pinfo
= parent
->get_shard_info().find(shard
)->second
;
1008 wr
= generate_subop(
1014 min_last_complete_ondisk
,
1022 if (op
->op
&& op
->op
->pg_trace
)
1023 wr
->trace
.init("replicated op", nullptr, &op
->op
->pg_trace
);
1024 get_parent()->send_message_osd_cluster(
1025 shard
.osd
, wr
, get_osdmap_epoch());
1031 void ReplicatedBackend::do_repop(OpRequestRef op
)
1033 static_cast<MOSDRepOp
*>(op
->get_nonconst_req())->finish_decode();
1034 auto m
= op
->get_req
<MOSDRepOp
>();
1035 int msg_type
= m
->get_type();
1036 ceph_assert(MSG_OSD_REPOP
== msg_type
);
1038 const hobject_t
& soid
= m
->poid
;
1040 dout(10) << __func__
<< " " << soid
1041 << " v " << m
->version
1042 << (m
->logbl
.length() ? " (transaction)" : " (parallel exec")
1043 << " " << m
->logbl
.length()
1047 ceph_assert(m
->map_epoch
>= get_info().history
.same_interval_since
);
1049 dout(30) << __func__
<< " missing before " << get_parent()->get_log().get_missing().get_items() << dendl
;
1050 parent
->maybe_preempt_replica_scrub(soid
);
1052 int ackerosd
= m
->get_source().num();
1056 RepModifyRef
rm(std::make_shared
<RepModify
>());
1058 rm
->ackerosd
= ackerosd
;
1059 rm
->last_complete
= get_info().last_complete
;
1060 rm
->epoch_started
= get_osdmap_epoch();
1062 ceph_assert(m
->logbl
.length());
1063 // shipped transaction and log entries
1064 vector
<pg_log_entry_t
> log
;
1066 auto p
= const_cast<bufferlist
&>(m
->get_data()).cbegin();
1069 if (m
->new_temp_oid
!= hobject_t()) {
1070 dout(20) << __func__
<< " start tracking temp " << m
->new_temp_oid
<< dendl
;
1071 add_temp_obj(m
->new_temp_oid
);
1073 if (m
->discard_temp_oid
!= hobject_t()) {
1074 dout(20) << __func__
<< " stop tracking temp " << m
->discard_temp_oid
<< dendl
;
1075 if (rm
->opt
.empty()) {
1076 dout(10) << __func__
<< ": removing object " << m
->discard_temp_oid
1077 << " since we won't get the transaction" << dendl
;
1078 rm
->localt
.remove(coll
, ghobject_t(m
->discard_temp_oid
));
1080 clear_temp_obj(m
->discard_temp_oid
);
1083 p
= const_cast<bufferlist
&>(m
->logbl
).begin();
1085 rm
->opt
.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
1087 bool update_snaps
= false;
1088 if (!rm
->opt
.empty()) {
1089 // If the opt is non-empty, we infer we are before
1090 // last_backfill (according to the primary, not our
1091 // not-quite-accurate value), and should update the
1092 // collections now. Otherwise, we do it later on push.
1093 update_snaps
= true;
1096 // flag set to true during async recovery
1098 pg_missing_tracker_t pmissing
= get_parent()->get_local_missing();
1099 if (pmissing
.is_missing(soid
)) {
1101 dout(30) << __func__
<< " is_missing " << pmissing
.is_missing(soid
) << dendl
;
1102 for (auto &&e
: log
) {
1103 dout(30) << " add_next_event entry " << e
<< dendl
;
1104 get_parent()->add_local_next_event(e
);
1105 dout(30) << " entry is_delete " << e
.is_delete() << dendl
;
1109 parent
->update_stats(m
->pg_stats
);
1110 parent
->log_operation(
1112 m
->updated_hit_set_history
,
1114 m
->version
, /* Replicated PGs don't have rollback info */
1115 m
->min_last_complete_ondisk
,
1120 rm
->opt
.register_on_commit(
1121 parent
->bless_context(
1122 new C_OSD_RepModifyCommit(this, rm
)));
1123 vector
<ObjectStore::Transaction
> tls
;
1125 tls
.push_back(std::move(rm
->localt
));
1126 tls
.push_back(std::move(rm
->opt
));
1127 parent
->queue_transactions(tls
, op
);
1128 // op is cleaned up by oncommit/onapply when both are executed
1129 dout(30) << __func__
<< " missing after" << get_parent()->get_log().get_missing().get_items() << dendl
;
1132 void ReplicatedBackend::repop_commit(RepModifyRef rm
)
1134 rm
->op
->mark_commit_sent();
1135 rm
->op
->pg_trace
.event("sup_op_commit");
1136 rm
->committed
= true;
1139 auto m
= rm
->op
->get_req
<MOSDRepOp
>();
1140 ceph_assert(m
->get_type() == MSG_OSD_REPOP
);
1141 dout(10) << __func__
<< " on op " << *m
1142 << ", sending commit to osd." << rm
->ackerosd
1144 ceph_assert(get_osdmap()->is_up(rm
->ackerosd
));
1146 get_parent()->update_last_complete_ondisk(rm
->last_complete
);
1148 MOSDRepOpReply
*reply
= new MOSDRepOpReply(
1150 get_parent()->whoami_shard(),
1151 0, get_osdmap_epoch(), m
->get_min_epoch(), CEPH_OSD_FLAG_ONDISK
);
1152 reply
->set_last_complete_ondisk(rm
->last_complete
);
1153 reply
->set_priority(CEPH_MSG_PRIO_HIGH
); // this better match ack priority!
1154 reply
->trace
= rm
->op
->pg_trace
;
1155 get_parent()->send_message_osd_cluster(
1156 rm
->ackerosd
, reply
, get_osdmap_epoch());
1158 log_subop_stats(get_parent()->get_logger(), rm
->op
, l_osd_sop_w
);
1162 // ===========================================================
1164 void ReplicatedBackend::calc_head_subsets(
1165 ObjectContextRef obc
, SnapSet
& snapset
, const hobject_t
& head
,
1166 const pg_missing_t
& missing
,
1167 const hobject_t
&last_backfill
,
1168 interval_set
<uint64_t>& data_subset
,
1169 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1170 ObcLockManager
&manager
)
1172 dout(10) << "calc_head_subsets " << head
1173 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1175 uint64_t size
= obc
->obs
.oi
.size
;
1177 data_subset
.insert(0, size
);
1179 if (HAVE_FEATURE(parent
->min_peer_features(), SERVER_OCTOPUS
)) {
1180 const auto it
= missing
.get_items().find(head
);
1181 assert(it
!= missing
.get_items().end());
1182 data_subset
.intersection_of(it
->second
.clean_regions
.get_dirty_regions());
1183 dout(10) << "calc_head_subsets " << head
1184 << " data_subset " << data_subset
<< dendl
;
1187 if (get_parent()->get_pool().allow_incomplete_clones()) {
1188 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1192 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1193 dout(10) << "calc_head_subsets " << head
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1198 interval_set
<uint64_t> cloning
;
1199 interval_set
<uint64_t> prev
;
1202 prev
.insert(0, size
);
1204 for (int j
=snapset
.clones
.size()-1; j
>=0; j
--) {
1205 c
.snap
= snapset
.clones
[j
];
1206 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1207 if (!missing
.is_missing(c
) &&
1208 c
< last_backfill
&&
1209 get_parent()->try_lock_for_read(c
, manager
)) {
1210 dout(10) << "calc_head_subsets " << head
<< " has prev " << c
1211 << " overlap " << prev
<< dendl
;
1215 dout(10) << "calc_head_subsets " << head
<< " does not have prev " << c
1216 << " overlap " << prev
<< dendl
;
1219 cloning
.intersection_of(data_subset
);
1220 if (cloning
.empty()) {
1221 dout(10) << "skipping clone, nothing needs to clone" << dendl
;
1225 if (cloning
.num_intervals() > g_conf().get_val
<uint64_t>("osd_recover_clone_overlap_limit")) {
1226 dout(10) << "skipping clone, too many holes" << dendl
;
1227 get_parent()->release_locks(manager
);
1228 clone_subsets
.clear();
1233 // what's left for us to push?
1234 clone_subsets
[c
] = cloning
;
1235 data_subset
.subtract(cloning
);
1237 dout(10) << "calc_head_subsets " << head
1238 << " data_subset " << data_subset
1239 << " clone_subsets " << clone_subsets
<< dendl
;
1242 void ReplicatedBackend::calc_clone_subsets(
1243 SnapSet
& snapset
, const hobject_t
& soid
,
1244 const pg_missing_t
& missing
,
1245 const hobject_t
&last_backfill
,
1246 interval_set
<uint64_t>& data_subset
,
1247 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1248 ObcLockManager
&manager
)
1250 dout(10) << "calc_clone_subsets " << soid
1251 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1253 uint64_t size
= snapset
.clone_size
[soid
.snap
];
1255 data_subset
.insert(0, size
);
1257 if (get_parent()->get_pool().allow_incomplete_clones()) {
1258 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1262 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1263 dout(10) << "calc_clone_subsets " << soid
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1268 for (i
=0; i
< snapset
.clones
.size(); i
++)
1269 if (snapset
.clones
[i
] == soid
.snap
)
1272 // any overlap with next older clone?
1273 interval_set
<uint64_t> cloning
;
1274 interval_set
<uint64_t> prev
;
1276 prev
.insert(0, size
);
1277 for (int j
=i
-1; j
>=0; j
--) {
1279 c
.snap
= snapset
.clones
[j
];
1280 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1281 if (!missing
.is_missing(c
) &&
1282 c
< last_backfill
&&
1283 get_parent()->try_lock_for_read(c
, manager
)) {
1284 dout(10) << "calc_clone_subsets " << soid
<< " has prev " << c
1285 << " overlap " << prev
<< dendl
;
1286 clone_subsets
[c
] = prev
;
1287 cloning
.union_of(prev
);
1290 dout(10) << "calc_clone_subsets " << soid
<< " does not have prev " << c
1291 << " overlap " << prev
<< dendl
;
1294 // overlap with next newest?
1295 interval_set
<uint64_t> next
;
1297 next
.insert(0, size
);
1298 for (unsigned j
=i
+1; j
<snapset
.clones
.size(); j
++) {
1300 c
.snap
= snapset
.clones
[j
];
1301 next
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
-1]]);
1302 if (!missing
.is_missing(c
) &&
1303 c
< last_backfill
&&
1304 get_parent()->try_lock_for_read(c
, manager
)) {
1305 dout(10) << "calc_clone_subsets " << soid
<< " has next " << c
1306 << " overlap " << next
<< dendl
;
1307 clone_subsets
[c
] = next
;
1308 cloning
.union_of(next
);
1311 dout(10) << "calc_clone_subsets " << soid
<< " does not have next " << c
1312 << " overlap " << next
<< dendl
;
1315 if (cloning
.num_intervals() > g_conf().get_val
<uint64_t>("osd_recover_clone_overlap_limit")) {
1316 dout(10) << "skipping clone, too many holes" << dendl
;
1317 get_parent()->release_locks(manager
);
1318 clone_subsets
.clear();
1323 // what's left for us to push?
1324 data_subset
.subtract(cloning
);
1326 dout(10) << "calc_clone_subsets " << soid
1327 << " data_subset " << data_subset
1328 << " clone_subsets " << clone_subsets
<< dendl
;
1331 void ReplicatedBackend::prepare_pull(
1333 const hobject_t
& soid
,
1334 ObjectContextRef headctx
,
1337 const auto missing_iter
= get_parent()->get_local_missing().get_items().find(soid
);
1338 ceph_assert(missing_iter
!= get_parent()->get_local_missing().get_items().end());
1339 eversion_t _v
= missing_iter
->second
.need
;
1340 ceph_assert(_v
== v
);
1341 const map
<hobject_t
, set
<pg_shard_t
>> &missing_loc(
1342 get_parent()->get_missing_loc_shards());
1343 const map
<pg_shard_t
, pg_missing_t
> &peer_missing(
1344 get_parent()->get_shard_missing());
1345 map
<hobject_t
, set
<pg_shard_t
>>::const_iterator q
= missing_loc
.find(soid
);
1346 ceph_assert(q
!= missing_loc
.end());
1347 ceph_assert(!q
->second
.empty());
1350 auto p
= q
->second
.end();
1351 if (cct
->_conf
->osd_debug_feed_pullee
>= 0) {
1352 for (auto it
= q
->second
.begin(); it
!= q
->second
.end(); it
++) {
1353 if (it
->osd
== cct
->_conf
->osd_debug_feed_pullee
) {
1359 if (p
== q
->second
.end()) {
1360 // probably because user feed a wrong pullee
1361 p
= q
->second
.begin();
1363 util::generate_random_number
<int>(0,
1364 q
->second
.size() - 1));
1366 ceph_assert(get_osdmap()->is_up(p
->osd
));
1367 pg_shard_t fromshard
= *p
;
1369 dout(7) << "pull " << soid
1371 << " on osds " << q
->second
1372 << " from osd." << fromshard
1375 ceph_assert(peer_missing
.count(fromshard
));
1376 const pg_missing_t
&pmissing
= peer_missing
.find(fromshard
)->second
;
1377 if (pmissing
.is_missing(soid
, v
)) {
1378 ceph_assert(pmissing
.get_items().find(soid
)->second
.have
!= v
);
1379 dout(10) << "pulling soid " << soid
<< " from osd " << fromshard
1380 << " at version " << pmissing
.get_items().find(soid
)->second
.have
1381 << " rather than at version " << v
<< dendl
;
1382 v
= pmissing
.get_items().find(soid
)->second
.have
;
1383 ceph_assert(get_parent()->get_log().get_log().objects
.count(soid
) &&
1384 (get_parent()->get_log().get_log().objects
.find(soid
)->second
->op
==
1385 pg_log_entry_t::LOST_REVERT
) &&
1386 (get_parent()->get_log().get_log().objects
.find(
1387 soid
)->second
->reverting_to
==
1391 ObjectRecoveryInfo recovery_info
;
1392 ObcLockManager lock_manager
;
1394 if (soid
.is_snap()) {
1395 ceph_assert(!get_parent()->get_local_missing().is_missing(soid
.get_head()));
1396 ceph_assert(headctx
);
1398 SnapSetContext
*ssc
= headctx
->ssc
;
1400 dout(10) << " snapset " << ssc
->snapset
<< dendl
;
1401 recovery_info
.ss
= ssc
->snapset
;
1403 ssc
->snapset
, soid
, get_parent()->get_local_missing(),
1404 get_info().last_backfill
,
1405 recovery_info
.copy_subset
,
1406 recovery_info
.clone_subset
,
1408 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1409 dout(10) << " pulling " << recovery_info
<< dendl
;
1411 ceph_assert(ssc
->snapset
.clone_size
.count(soid
.snap
));
1412 recovery_info
.size
= ssc
->snapset
.clone_size
[soid
.snap
];
1413 recovery_info
.object_exist
= missing_iter
->second
.clean_regions
.object_is_exist();
1415 // pulling head or unversioned object.
1416 // always pull the whole thing.
1417 recovery_info
.copy_subset
.insert(0, (uint64_t)-1);
1418 if (HAVE_FEATURE(parent
->min_peer_features(), SERVER_OCTOPUS
))
1419 recovery_info
.copy_subset
.intersection_of(missing_iter
->second
.clean_regions
.get_dirty_regions());
1420 recovery_info
.size
= ((uint64_t)-1);
1421 recovery_info
.object_exist
= missing_iter
->second
.clean_regions
.object_is_exist();
1424 h
->pulls
[fromshard
].push_back(PullOp());
1425 PullOp
&op
= h
->pulls
[fromshard
].back();
1428 op
.recovery_info
= recovery_info
;
1429 op
.recovery_info
.soid
= soid
;
1430 op
.recovery_info
.version
= v
;
1431 op
.recovery_progress
.data_complete
= false;
1432 op
.recovery_progress
.omap_complete
= !missing_iter
->second
.clean_regions
.omap_is_dirty()
1433 && HAVE_FEATURE(parent
->min_peer_features(), SERVER_OCTOPUS
);
1434 op
.recovery_progress
.data_recovered_to
= 0;
1435 op
.recovery_progress
.first
= true;
1437 ceph_assert(!pulling
.count(soid
));
1438 pull_from_peer
[fromshard
].insert(soid
);
1439 PullInfo
&pi
= pulling
[soid
];
1440 pi
.from
= fromshard
;
1442 pi
.head_ctx
= headctx
;
1443 pi
.recovery_info
= op
.recovery_info
;
1444 pi
.recovery_progress
= op
.recovery_progress
;
1445 pi
.cache_dont_need
= h
->cache_dont_need
;
1446 pi
.lock_manager
= std::move(lock_manager
);
1450 * intelligently push an object to a replica. make use of existing
1451 * clones/heads and dup data ranges where possible.
1453 int ReplicatedBackend::prep_push_to_replica(
1454 ObjectContextRef obc
, const hobject_t
& soid
, pg_shard_t peer
,
1455 PushOp
*pop
, bool cache_dont_need
)
1457 const object_info_t
& oi
= obc
->obs
.oi
;
1458 uint64_t size
= obc
->obs
.oi
.size
;
1460 dout(10) << __func__
<< ": " << soid
<< " v" << oi
.version
1461 << " size " << size
<< " to osd." << peer
<< dendl
;
1463 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1464 interval_set
<uint64_t> data_subset
;
1466 ObcLockManager lock_manager
;
1467 // are we doing a clone on the replica?
1468 if (soid
.snap
&& soid
.snap
< CEPH_NOSNAP
) {
1469 hobject_t head
= soid
;
1470 head
.snap
= CEPH_NOSNAP
;
1472 // try to base push off of clones that succeed/preceed poid
1473 // we need the head (and current SnapSet) locally to do that.
1474 if (get_parent()->get_local_missing().is_missing(head
)) {
1475 dout(15) << "push_to_replica missing head " << head
<< ", pushing raw clone" << dendl
;
1476 return prep_push(obc
, soid
, peer
, pop
, cache_dont_need
);
1479 SnapSetContext
*ssc
= obc
->ssc
;
1481 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1482 pop
->recovery_info
.ss
= ssc
->snapset
;
1483 map
<pg_shard_t
, pg_missing_t
>::const_iterator pm
=
1484 get_parent()->get_shard_missing().find(peer
);
1485 ceph_assert(pm
!= get_parent()->get_shard_missing().end());
1486 map
<pg_shard_t
, pg_info_t
>::const_iterator pi
=
1487 get_parent()->get_shard_info().find(peer
);
1488 ceph_assert(pi
!= get_parent()->get_shard_info().end());
1492 pi
->second
.last_backfill
,
1493 data_subset
, clone_subsets
,
1495 } else if (soid
.snap
== CEPH_NOSNAP
) {
1496 // pushing head or unversioned object.
1497 // base this on partially on replica's clones?
1498 SnapSetContext
*ssc
= obc
->ssc
;
1500 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1503 ssc
->snapset
, soid
, get_parent()->get_shard_missing().find(peer
)->second
,
1504 get_parent()->get_shard_info().find(peer
)->second
.last_backfill
,
1505 data_subset
, clone_subsets
,
1518 std::move(lock_manager
));
1521 int ReplicatedBackend::prep_push(ObjectContextRef obc
,
1522 const hobject_t
& soid
, pg_shard_t peer
,
1523 PushOp
*pop
, bool cache_dont_need
)
1525 interval_set
<uint64_t> data_subset
;
1526 if (obc
->obs
.oi
.size
)
1527 data_subset
.insert(0, obc
->obs
.oi
.size
);
1528 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1530 return prep_push(obc
, soid
, peer
,
1531 obc
->obs
.oi
.version
, data_subset
, clone_subsets
,
1532 pop
, cache_dont_need
, ObcLockManager());
1535 int ReplicatedBackend::prep_push(
1536 ObjectContextRef obc
,
1537 const hobject_t
& soid
, pg_shard_t peer
,
1539 interval_set
<uint64_t> &data_subset
,
1540 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1542 bool cache_dont_need
,
1543 ObcLockManager
&&lock_manager
)
1545 get_parent()->begin_peer_recover(peer
, soid
);
1546 const auto pmissing_iter
= get_parent()->get_shard_missing().find(peer
);
1547 const auto missing_iter
= pmissing_iter
->second
.get_items().find(soid
);
1548 assert(missing_iter
!= pmissing_iter
->second
.get_items().end());
1550 PushInfo
&pi
= pushing
[soid
][peer
];
1552 pi
.recovery_info
.size
= obc
->obs
.oi
.size
;
1553 pi
.recovery_info
.copy_subset
= data_subset
;
1554 pi
.recovery_info
.clone_subset
= clone_subsets
;
1555 pi
.recovery_info
.soid
= soid
;
1556 pi
.recovery_info
.oi
= obc
->obs
.oi
;
1557 pi
.recovery_info
.ss
= pop
->recovery_info
.ss
;
1558 pi
.recovery_info
.version
= version
;
1559 pi
.recovery_info
.object_exist
= missing_iter
->second
.clean_regions
.object_is_exist();
1560 pi
.recovery_progress
.omap_complete
= !missing_iter
->second
.clean_regions
.omap_is_dirty() &&
1561 HAVE_FEATURE(parent
->min_peer_features(), SERVER_OCTOPUS
);
1562 pi
.lock_manager
= std::move(lock_manager
);
1564 ObjectRecoveryProgress new_progress
;
1565 int r
= build_push_op(pi
.recovery_info
,
1566 pi
.recovery_progress
,
1569 &(pi
.stat
), cache_dont_need
);
1572 pi
.recovery_progress
= new_progress
;
1576 void ReplicatedBackend::submit_push_data(
1577 const ObjectRecoveryInfo
&recovery_info
,
1581 bool cache_dont_need
,
1582 interval_set
<uint64_t> &data_zeros
,
1583 const interval_set
<uint64_t> &intervals_included
,
1584 bufferlist data_included
,
1585 bufferlist omap_header
,
1586 const map
<string
, bufferlist
> &attrs
,
1587 const map
<string
, bufferlist
> &omap_entries
,
1588 ObjectStore::Transaction
*t
)
1590 hobject_t target_oid
;
1591 if (first
&& complete
) {
1592 target_oid
= recovery_info
.soid
;
1594 target_oid
= get_parent()->get_temp_recovery_object(recovery_info
.soid
,
1595 recovery_info
.version
);
1597 dout(10) << __func__
<< ": Adding oid "
1598 << target_oid
<< " in the temp collection" << dendl
;
1599 add_temp_obj(target_oid
);
1605 t
->remove(coll
, ghobject_t(target_oid
));
1606 t
->touch(coll
, ghobject_t(target_oid
));
1607 bufferlist bv
= attrs
.at(OI_ATTR
);
1608 object_info_t
oi(bv
);
1609 t
->set_alloc_hint(coll
, ghobject_t(target_oid
),
1610 oi
.expected_object_size
,
1611 oi
.expected_write_size
,
1612 oi
.alloc_hint_flags
);
1614 if (!recovery_info
.object_exist
) {
1615 t
->remove(coll
, ghobject_t(target_oid
));
1616 t
->touch(coll
, ghobject_t(target_oid
));
1617 bufferlist bv
= attrs
.at(OI_ATTR
);
1618 object_info_t
oi(bv
);
1619 t
->set_alloc_hint(coll
, ghobject_t(target_oid
),
1620 oi
.expected_object_size
,
1621 oi
.expected_write_size
,
1622 oi
.alloc_hint_flags
);
1624 //remove xattr and update later if overwrite on original object
1625 t
->rmattrs(coll
, ghobject_t(target_oid
));
1626 //if need update omap, clear the previous content first
1628 t
->omap_clear(coll
, ghobject_t(target_oid
));
1631 t
->truncate(coll
, ghobject_t(target_oid
), recovery_info
.size
);
1632 if (omap_header
.length())
1633 t
->omap_setheader(coll
, ghobject_t(target_oid
), omap_header
);
1636 int r
= store
->stat(ch
, ghobject_t(recovery_info
.soid
), &st
);
1637 if (get_parent()->pg_is_remote_backfilling()) {
1641 // Don't need to do anything if object is still the same size
1642 if (size
!= recovery_info
.oi
.size
) {
1643 get_parent()->pg_add_local_num_bytes((int64_t)recovery_info
.oi
.size
- (int64_t)size
);
1644 get_parent()->pg_add_num_bytes((int64_t)recovery_info
.oi
.size
- (int64_t)size
);
1645 dout(10) << __func__
<< " " << recovery_info
.soid
1646 << " backfill size " << recovery_info
.oi
.size
1647 << " previous size " << size
1648 << " net size " << recovery_info
.oi
.size
- size
1653 //clone overlap content in local object
1654 if (recovery_info
.object_exist
) {
1656 uint64_t local_size
= std::min(recovery_info
.size
, (uint64_t)st
.st_size
);
1657 interval_set
<uint64_t> local_intervals_included
, local_intervals_excluded
;
1659 local_intervals_included
.insert(0, local_size
);
1660 local_intervals_excluded
.intersection_of(local_intervals_included
, recovery_info
.copy_subset
);
1661 local_intervals_included
.subtract(local_intervals_excluded
);
1663 for (interval_set
<uint64_t>::const_iterator q
= local_intervals_included
.begin();
1664 q
!= local_intervals_included
.end();
1666 dout(15) << " clone_range " << recovery_info
.soid
<< " "
1667 << q
.get_start() << "~" << q
.get_len() << dendl
;
1668 t
->clone_range(coll
, ghobject_t(recovery_info
.soid
), ghobject_t(target_oid
),
1669 q
.get_start(), q
.get_len(), q
.get_start());
1675 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
;
1676 if (cache_dont_need
)
1677 fadvise_flags
|= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
1678 // Punch zeros for data, if fiemap indicates nothing but it is marked dirty
1679 if (data_zeros
.size() > 0) {
1680 data_zeros
.intersection_of(recovery_info
.copy_subset
);
1681 assert(intervals_included
.subset_of(data_zeros
));
1682 data_zeros
.subtract(intervals_included
);
1684 dout(20) << __func__
<<" recovering object " << recovery_info
.soid
1685 << " copy_subset: " << recovery_info
.copy_subset
1686 << " intervals_included: " << intervals_included
1687 << " data_zeros: " << data_zeros
<< dendl
;
1689 for (auto p
= data_zeros
.begin(); p
!= data_zeros
.end(); ++p
)
1690 t
->zero(coll
, ghobject_t(target_oid
), p
.get_start(), p
.get_len());
1692 for (interval_set
<uint64_t>::const_iterator p
= intervals_included
.begin();
1693 p
!= intervals_included
.end();
1696 bit
.substr_of(data_included
, off
, p
.get_len());
1697 t
->write(coll
, ghobject_t(target_oid
),
1698 p
.get_start(), p
.get_len(), bit
, fadvise_flags
);
1702 if (!omap_entries
.empty())
1703 t
->omap_setkeys(coll
, ghobject_t(target_oid
), omap_entries
);
1705 t
->setattrs(coll
, ghobject_t(target_oid
), attrs
);
1709 dout(10) << __func__
<< ": Removing oid "
1710 << target_oid
<< " from the temp collection" << dendl
;
1711 clear_temp_obj(target_oid
);
1712 t
->remove(coll
, ghobject_t(recovery_info
.soid
));
1713 t
->collection_move_rename(coll
, ghobject_t(target_oid
),
1714 coll
, ghobject_t(recovery_info
.soid
));
1717 submit_push_complete(recovery_info
, t
);
1722 void ReplicatedBackend::submit_push_complete(
1723 const ObjectRecoveryInfo
&recovery_info
,
1724 ObjectStore::Transaction
*t
)
1726 for (map
<hobject_t
, interval_set
<uint64_t>>::const_iterator p
=
1727 recovery_info
.clone_subset
.begin();
1728 p
!= recovery_info
.clone_subset
.end();
1730 for (interval_set
<uint64_t>::const_iterator q
= p
->second
.begin();
1731 q
!= p
->second
.end();
1733 dout(15) << " clone_range " << p
->first
<< " "
1734 << q
.get_start() << "~" << q
.get_len() << dendl
;
1735 t
->clone_range(coll
, ghobject_t(p
->first
), ghobject_t(recovery_info
.soid
),
1736 q
.get_start(), q
.get_len(), q
.get_start());
1741 ObjectRecoveryInfo
ReplicatedBackend::recalc_subsets(
1742 const ObjectRecoveryInfo
& recovery_info
,
1743 SnapSetContext
*ssc
,
1744 ObcLockManager
&manager
)
1746 if (!recovery_info
.soid
.snap
|| recovery_info
.soid
.snap
>= CEPH_NOSNAP
)
1747 return recovery_info
;
1748 ObjectRecoveryInfo new_info
= recovery_info
;
1749 new_info
.copy_subset
.clear();
1750 new_info
.clone_subset
.clear();
1752 get_parent()->release_locks(manager
); // might already have locks
1754 ssc
->snapset
, new_info
.soid
, get_parent()->get_local_missing(),
1755 get_info().last_backfill
,
1756 new_info
.copy_subset
, new_info
.clone_subset
,
1761 bool ReplicatedBackend::handle_pull_response(
1762 pg_shard_t from
, const PushOp
&pop
, PullOp
*response
,
1763 list
<pull_complete_info
> *to_continue
,
1764 ObjectStore::Transaction
*t
)
1766 interval_set
<uint64_t> data_included
= pop
.data_included
;
1769 dout(10) << "handle_pull_response "
1770 << pop
.recovery_info
1771 << pop
.after_progress
1772 << " data.size() is " << data
.length()
1773 << " data_included: " << data_included
1775 if (pop
.version
== eversion_t()) {
1776 // replica doesn't have it!
1777 _failed_pull(from
, pop
.soid
);
1781 const hobject_t
&hoid
= pop
.soid
;
1782 ceph_assert((data_included
.empty() && data
.length() == 0) ||
1783 (!data_included
.empty() && data
.length() > 0));
1785 auto piter
= pulling
.find(hoid
);
1786 if (piter
== pulling
.end()) {
1790 PullInfo
&pi
= piter
->second
;
1791 if (pi
.recovery_info
.size
== (uint64_t(-1))) {
1792 pi
.recovery_info
.size
= pop
.recovery_info
.size
;
1793 pi
.recovery_info
.copy_subset
.intersection_of(
1794 pop
.recovery_info
.copy_subset
);
1796 // If primary doesn't have object info and didn't know version
1797 if (pi
.recovery_info
.version
== eversion_t()) {
1798 pi
.recovery_info
.version
= pop
.version
;
1801 bool first
= pi
.recovery_progress
.first
;
1803 // attrs only reference the origin bufferlist (decode from
1804 // MOSDPGPush message) whose size is much greater than attrs in
1805 // recovery. If obc cache it (get_obc maybe cache the attr), this
1806 // causes the whole origin bufferlist would not be free until obc
1807 // is evicted from obc cache. So rebuild the bufferlists before
1809 auto attrset
= pop
.attrset
;
1810 for (auto& a
: attrset
) {
1813 pi
.obc
= get_parent()->get_obc(pi
.recovery_info
.soid
, attrset
);
1814 if (attrset
.find(SS_ATTR
) != attrset
.end()) {
1815 bufferlist ssbv
= attrset
.at(SS_ATTR
);
1817 assert(!pi
.obc
->ssc
->exists
|| ss
.seq
== pi
.obc
->ssc
->snapset
.seq
);
1819 pi
.recovery_info
.oi
= pi
.obc
->obs
.oi
;
1820 pi
.recovery_info
= recalc_subsets(
1827 interval_set
<uint64_t> usable_intervals
;
1828 bufferlist usable_data
;
1829 trim_pushed_data(pi
.recovery_info
.copy_subset
,
1834 data_included
= usable_intervals
;
1835 data
.claim(usable_data
);
1838 pi
.recovery_progress
= pop
.after_progress
;
1840 dout(10) << "new recovery_info " << pi
.recovery_info
1841 << ", new progress " << pi
.recovery_progress
1843 interval_set
<uint64_t> data_zeros
;
1844 uint64_t z_offset
= pop
.before_progress
.data_recovered_to
;
1845 uint64_t z_length
= pop
.after_progress
.data_recovered_to
- pop
.before_progress
.data_recovered_to
;
1847 data_zeros
.insert(z_offset
, z_length
);
1848 bool complete
= pi
.is_complete();
1849 bool clear_omap
= !pop
.before_progress
.omap_complete
;
1851 submit_push_data(pi
.recovery_info
,
1864 pi
.stat
.num_keys_recovered
+= pop
.omap_entries
.size();
1865 pi
.stat
.num_bytes_recovered
+= data
.length();
1866 get_parent()->get_logger()->inc(l_osd_rbytes
, pop
.omap_entries
.size() + data
.length());
1869 pi
.stat
.num_objects_recovered
++;
1870 // XXX: This could overcount if regular recovery is needed right after a repair
1871 if (get_parent()->pg_is_repair()) {
1872 pi
.stat
.num_objects_repaired
++;
1873 get_parent()->inc_osd_stat_repaired();
1875 clear_pull_from(piter
);
1876 to_continue
->push_back({hoid
, pi
.stat
});
1877 get_parent()->on_local_recover(
1878 hoid
, pi
.recovery_info
, pi
.obc
, false, t
);
1881 response
->soid
= pop
.soid
;
1882 response
->recovery_info
= pi
.recovery_info
;
1883 response
->recovery_progress
= pi
.recovery_progress
;
1888 void ReplicatedBackend::handle_push(
1889 pg_shard_t from
, const PushOp
&pop
, PushReplyOp
*response
,
1890 ObjectStore::Transaction
*t
, bool is_repair
)
1892 dout(10) << "handle_push "
1893 << pop
.recovery_info
1894 << pop
.after_progress
1898 bool first
= pop
.before_progress
.first
;
1899 bool complete
= pop
.after_progress
.data_complete
&&
1900 pop
.after_progress
.omap_complete
;
1901 bool clear_omap
= !pop
.before_progress
.omap_complete
;
1902 interval_set
<uint64_t> data_zeros
;
1903 uint64_t z_offset
= pop
.before_progress
.data_recovered_to
;
1904 uint64_t z_length
= pop
.after_progress
.data_recovered_to
- pop
.before_progress
.data_recovered_to
;
1906 data_zeros
.insert(z_offset
, z_length
);
1907 response
->soid
= pop
.recovery_info
.soid
;
1909 submit_push_data(pop
.recovery_info
,
1913 true, // must be replicate
1924 get_parent()->inc_osd_stat_repaired();
1925 dout(20) << __func__
<< " repair complete" << dendl
;
1927 get_parent()->on_local_recover(
1928 pop
.recovery_info
.soid
,
1930 ObjectContextRef(), // ok, is replica
1936 void ReplicatedBackend::send_pushes(int prio
, map
<pg_shard_t
, vector
<PushOp
> > &pushes
)
1938 for (map
<pg_shard_t
, vector
<PushOp
> >::iterator i
= pushes
.begin();
1941 ConnectionRef con
= get_parent()->get_con_osd_cluster(
1943 get_osdmap_epoch());
1946 vector
<PushOp
>::iterator j
= i
->second
.begin();
1947 while (j
!= i
->second
.end()) {
1949 uint64_t pushes
= 0;
1950 MOSDPGPush
*msg
= new MOSDPGPush();
1951 msg
->from
= get_parent()->whoami_shard();
1952 msg
->pgid
= get_parent()->primary_spg_t();
1953 msg
->map_epoch
= get_osdmap_epoch();
1954 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
1955 msg
->set_priority(prio
);
1956 msg
->is_repair
= get_parent()->pg_is_repair();
1958 (j
!= i
->second
.end() &&
1959 cost
< cct
->_conf
->osd_max_push_cost
&&
1960 pushes
< cct
->_conf
->osd_max_push_objects
) ;
1962 dout(20) << __func__
<< ": sending push " << *j
1963 << " to osd." << i
->first
<< dendl
;
1964 cost
+= j
->cost(cct
);
1966 msg
->pushes
.push_back(*j
);
1968 msg
->set_cost(cost
);
1969 get_parent()->send_message_osd_cluster(msg
, con
);
1974 void ReplicatedBackend::send_pulls(int prio
, map
<pg_shard_t
, vector
<PullOp
> > &pulls
)
1976 for (map
<pg_shard_t
, vector
<PullOp
> >::iterator i
= pulls
.begin();
1979 ConnectionRef con
= get_parent()->get_con_osd_cluster(
1981 get_osdmap_epoch());
1984 dout(20) << __func__
<< ": sending pulls " << i
->second
1985 << " to osd." << i
->first
<< dendl
;
1986 MOSDPGPull
*msg
= new MOSDPGPull();
1987 msg
->from
= parent
->whoami_shard();
1988 msg
->set_priority(prio
);
1989 msg
->pgid
= get_parent()->primary_spg_t();
1990 msg
->map_epoch
= get_osdmap_epoch();
1991 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
1992 msg
->set_pulls(&i
->second
);
1993 msg
->compute_cost(cct
);
1994 get_parent()->send_message_osd_cluster(msg
, con
);
1998 int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo
&recovery_info
,
1999 const ObjectRecoveryProgress
&progress
,
2000 ObjectRecoveryProgress
*out_progress
,
2002 object_stat_sum_t
*stat
,
2003 bool cache_dont_need
)
2005 ObjectRecoveryProgress _new_progress
;
2007 out_progress
= &_new_progress
;
2008 ObjectRecoveryProgress
&new_progress
= *out_progress
;
2009 new_progress
= progress
;
2011 dout(7) << __func__
<< " " << recovery_info
.soid
2012 << " v " << recovery_info
.version
2013 << " size " << recovery_info
.size
2014 << " recovery_info: " << recovery_info
2017 eversion_t v
= recovery_info
.version
;
2019 if (progress
.first
) {
2020 int r
= store
->omap_get_header(ch
, ghobject_t(recovery_info
.soid
), &out_op
->omap_header
);
2022 dout(1) << __func__
<< " get omap header failed: " << cpp_strerror(-r
) << dendl
;
2025 r
= store
->getattrs(ch
, ghobject_t(recovery_info
.soid
), out_op
->attrset
);
2027 dout(1) << __func__
<< " getattrs failed: " << cpp_strerror(-r
) << dendl
;
2032 bufferlist bv
= out_op
->attrset
[OI_ATTR
];
2034 auto bliter
= bv
.cbegin();
2037 dout(0) << __func__
<< ": bad object_info_t: " << recovery_info
.soid
<< dendl
;
2041 // If requestor didn't know the version, use ours
2042 if (v
== eversion_t()) {
2044 } else if (oi
.version
!= v
) {
2045 get_parent()->clog_error() << get_info().pgid
<< " push "
2046 << recovery_info
.soid
<< " v "
2047 << recovery_info
.version
2048 << " failed because local copy is "
2053 new_progress
.first
= false;
2055 // Once we provide the version subsequent requests will have it, so
2056 // at this point it must be known.
2057 ceph_assert(v
!= eversion_t());
2059 uint64_t available
= cct
->_conf
->osd_recovery_max_chunk
;
2060 if (!progress
.omap_complete
) {
2061 ObjectMap::ObjectMapIterator iter
=
2062 store
->get_omap_iterator(ch
,
2063 ghobject_t(recovery_info
.soid
));
2065 for (iter
->lower_bound(progress
.omap_recovered_to
);
2068 if (!out_op
->omap_entries
.empty() &&
2069 ((cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
> 0 &&
2070 out_op
->omap_entries
.size() >= cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
) ||
2071 available
<= iter
->key().size() + iter
->value().length()))
2073 out_op
->omap_entries
.insert(make_pair(iter
->key(), iter
->value()));
2075 if ((iter
->key().size() + iter
->value().length()) <= available
)
2076 available
-= (iter
->key().size() + iter
->value().length());
2081 new_progress
.omap_complete
= true;
2083 new_progress
.omap_recovered_to
= iter
->key();
2086 if (available
> 0) {
2087 if (!recovery_info
.copy_subset
.empty()) {
2088 interval_set
<uint64_t> copy_subset
= recovery_info
.copy_subset
;
2089 map
<uint64_t, uint64_t> m
;
2090 int r
= store
->fiemap(ch
, ghobject_t(recovery_info
.soid
), 0,
2091 copy_subset
.range_end(), m
);
2093 interval_set
<uint64_t> fiemap_included(std::move(m
));
2094 copy_subset
.intersection_of(fiemap_included
);
2096 // intersection of copy_subset and empty interval_set would be empty anyway
2097 copy_subset
.clear();
2100 out_op
->data_included
.span_of(copy_subset
, progress
.data_recovered_to
,
2102 if (out_op
->data_included
.empty()) // zero filled section, skip to end!
2103 new_progress
.data_recovered_to
= recovery_info
.copy_subset
.range_end();
2105 new_progress
.data_recovered_to
= out_op
->data_included
.range_end();
2108 out_op
->data_included
.clear();
2111 auto origin_size
= out_op
->data_included
.size();
2113 int r
= store
->readv(ch
, ghobject_t(recovery_info
.soid
),
2114 out_op
->data_included
, bit
,
2115 cache_dont_need
? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
: 0);
2116 if (cct
->_conf
->osd_debug_random_push_read_error
&&
2117 (rand() % (int)(cct
->_conf
->osd_debug_random_push_read_error
* 100.0)) == 0) {
2118 dout(0) << __func__
<< ": inject EIO " << recovery_info
.soid
<< dendl
;
2124 if (out_op
->data_included
.size() != origin_size
) {
2125 dout(10) << __func__
<< " some extents get pruned "
2126 << out_op
->data_included
.size() << "/" << origin_size
2128 new_progress
.data_complete
= true;
2130 out_op
->data
.claim_append(bit
);
2131 if (progress
.first
&& !out_op
->data_included
.empty() &&
2132 out_op
->data_included
.begin().get_start() == 0 &&
2133 out_op
->data
.length() == oi
.size
&& oi
.is_data_digest()) {
2134 uint32_t crc
= out_op
->data
.crc32c(-1);
2135 if (oi
.data_digest
!= crc
) {
2136 dout(0) << __func__
<< " " << coll
<< std::hex
2137 << " full-object read crc 0x" << crc
2138 << " != expected 0x" << oi
.data_digest
2139 << std::dec
<< " on " << recovery_info
.soid
<< dendl
;
2144 if (new_progress
.is_complete(recovery_info
)) {
2145 new_progress
.data_complete
= true;
2147 stat
->num_objects_recovered
++;
2148 if (get_parent()->pg_is_repair())
2149 stat
->num_objects_repaired
++;
2151 } else if (progress
.first
&& progress
.omap_complete
) {
2152 // If omap is not changed, we need recovery omap when recovery cannot be completed once
2153 new_progress
.omap_complete
= false;
2157 stat
->num_keys_recovered
+= out_op
->omap_entries
.size();
2158 stat
->num_bytes_recovered
+= out_op
->data
.length();
2159 get_parent()->get_logger()->inc(l_osd_rbytes
, out_op
->omap_entries
.size() + out_op
->data
.length());
2162 get_parent()->get_logger()->inc(l_osd_push
);
2163 get_parent()->get_logger()->inc(l_osd_push_outb
, out_op
->data
.length());
2166 out_op
->version
= v
;
2167 out_op
->soid
= recovery_info
.soid
;
2168 out_op
->recovery_info
= recovery_info
;
2169 out_op
->after_progress
= new_progress
;
2170 out_op
->before_progress
= progress
;
2174 void ReplicatedBackend::prep_push_op_blank(const hobject_t
& soid
, PushOp
*op
)
2176 op
->recovery_info
.version
= eversion_t();
2177 op
->version
= eversion_t();
2181 bool ReplicatedBackend::handle_push_reply(
2182 pg_shard_t peer
, const PushReplyOp
&op
, PushOp
*reply
)
2184 const hobject_t
&soid
= op
.soid
;
2185 if (pushing
.count(soid
) == 0) {
2186 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2187 << ", or anybody else"
2190 } else if (pushing
[soid
].count(peer
) == 0) {
2191 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2195 PushInfo
*pi
= &pushing
[soid
][peer
];
2196 bool error
= pushing
[soid
].begin()->second
.recovery_progress
.error
;
2198 if (!pi
->recovery_progress
.data_complete
&& !error
) {
2199 dout(10) << " pushing more from, "
2200 << pi
->recovery_progress
.data_recovered_to
2201 << " of " << pi
->recovery_info
.copy_subset
<< dendl
;
2202 ObjectRecoveryProgress new_progress
;
2203 int r
= build_push_op(
2205 pi
->recovery_progress
, &new_progress
, reply
,
2207 // Handle the case of a read error right after we wrote, which is
2208 // hopefully extremely rare.
2210 dout(5) << __func__
<< ": oid " << soid
<< " error " << r
<< dendl
;
2215 pi
->recovery_progress
= new_progress
;
2221 get_parent()->on_peer_recover( peer
, soid
, pi
->recovery_info
);
2223 get_parent()->release_locks(pi
->lock_manager
);
2224 object_stat_sum_t stat
= pi
->stat
;
2225 eversion_t v
= pi
->recovery_info
.version
;
2226 pushing
[soid
].erase(peer
);
2229 if (pushing
[soid
].empty()) {
2231 get_parent()->on_global_recover(soid
, stat
, false);
2233 get_parent()->on_failed_pull(
2234 std::set
<pg_shard_t
>{ get_parent()->whoami_shard() },
2237 pushing
.erase(soid
);
2239 // This looks weird, but we erased the current peer and need to remember
2240 // the error on any other one, while getting more acks.
2242 pushing
[soid
].begin()->second
.recovery_progress
.error
= true;
2243 dout(10) << "pushed " << soid
<< ", still waiting for push ack from "
2244 << pushing
[soid
].size() << " others" << dendl
;
2251 void ReplicatedBackend::handle_pull(pg_shard_t peer
, PullOp
&op
, PushOp
*reply
)
2253 const hobject_t
&soid
= op
.soid
;
2255 int r
= store
->stat(ch
, ghobject_t(soid
), &st
);
2257 get_parent()->clog_error() << get_info().pgid
<< " "
2258 << peer
<< " tried to pull " << soid
2259 << " but got " << cpp_strerror(-r
);
2260 prep_push_op_blank(soid
, reply
);
2262 ObjectRecoveryInfo
&recovery_info
= op
.recovery_info
;
2263 ObjectRecoveryProgress
&progress
= op
.recovery_progress
;
2264 if (progress
.first
&& recovery_info
.size
== ((uint64_t)-1)) {
2265 // Adjust size and copy_subset
2266 recovery_info
.size
= st
.st_size
;
2268 interval_set
<uint64_t> object_range
;
2269 object_range
.insert(0, st
.st_size
);
2270 recovery_info
.copy_subset
.intersection_of(object_range
);
2272 recovery_info
.copy_subset
.clear();
2274 assert(recovery_info
.clone_subset
.empty());
2277 r
= build_push_op(recovery_info
, progress
, 0, reply
);
2279 prep_push_op_blank(soid
, reply
);
2284 * trim received data to remove what we don't want
2286 * @param copy_subset intervals we want
2287 * @param data_included intervals we got
2288 * @param data_recieved data we got
2289 * @param intervals_usable intervals we want to keep
2290 * @param data_usable matching data we want to keep
2292 void ReplicatedBackend::trim_pushed_data(
2293 const interval_set
<uint64_t> ©_subset
,
2294 const interval_set
<uint64_t> &intervals_received
,
2295 bufferlist data_received
,
2296 interval_set
<uint64_t> *intervals_usable
,
2297 bufferlist
*data_usable
)
2299 if (intervals_received
.subset_of(copy_subset
)) {
2300 *intervals_usable
= intervals_received
;
2301 *data_usable
= data_received
;
2305 intervals_usable
->intersection_of(copy_subset
,
2306 intervals_received
);
2309 for (interval_set
<uint64_t>::const_iterator p
= intervals_received
.begin();
2310 p
!= intervals_received
.end();
2312 interval_set
<uint64_t> x
;
2313 x
.insert(p
.get_start(), p
.get_len());
2314 x
.intersection_of(copy_subset
);
2315 for (interval_set
<uint64_t>::const_iterator q
= x
.begin();
2319 uint64_t data_off
= off
+ (q
.get_start() - p
.get_start());
2320 sub
.substr_of(data_received
, data_off
, q
.get_len());
2321 data_usable
->claim_append(sub
);
2327 void ReplicatedBackend::_failed_pull(pg_shard_t from
, const hobject_t
&soid
)
2329 dout(20) << __func__
<< ": " << soid
<< " from " << from
<< dendl
;
2330 auto it
= pulling
.find(soid
);
2331 assert(it
!= pulling
.end());
2332 get_parent()->on_failed_pull(
2335 it
->second
.recovery_info
.version
);
2340 void ReplicatedBackend::clear_pull_from(
2341 map
<hobject_t
, PullInfo
>::iterator piter
)
2343 auto from
= piter
->second
.from
;
2344 pull_from_peer
[from
].erase(piter
->second
.soid
);
2345 if (pull_from_peer
[from
].empty())
2346 pull_from_peer
.erase(from
);
2349 void ReplicatedBackend::clear_pull(
2350 map
<hobject_t
, PullInfo
>::iterator piter
,
2351 bool clear_pull_from_peer
)
2353 if (clear_pull_from_peer
) {
2354 clear_pull_from(piter
);
2356 get_parent()->release_locks(piter
->second
.lock_manager
);
2357 pulling
.erase(piter
);
2360 int ReplicatedBackend::start_pushes(
2361 const hobject_t
&soid
,
2362 ObjectContextRef obc
,
2365 list
< map
<pg_shard_t
, pg_missing_t
>::const_iterator
> shards
;
2367 dout(20) << __func__
<< " soid " << soid
<< dendl
;
2369 ceph_assert(get_parent()->get_acting_recovery_backfill_shards().size() > 0);
2370 for (set
<pg_shard_t
>::iterator i
=
2371 get_parent()->get_acting_recovery_backfill_shards().begin();
2372 i
!= get_parent()->get_acting_recovery_backfill_shards().end();
2374 if (*i
== get_parent()->whoami_shard()) continue;
2375 pg_shard_t peer
= *i
;
2376 map
<pg_shard_t
, pg_missing_t
>::const_iterator j
=
2377 get_parent()->get_shard_missing().find(peer
);
2378 ceph_assert(j
!= get_parent()->get_shard_missing().end());
2379 if (j
->second
.is_missing(soid
)) {
2380 shards
.push_back(j
);
2384 // If more than 1 read will occur ignore possible request to not cache
2385 bool cache
= shards
.size() == 1 ? h
->cache_dont_need
: false;
2387 for (auto j
: shards
) {
2388 pg_shard_t peer
= j
->first
;
2389 h
->pushes
[peer
].push_back(PushOp());
2390 int r
= prep_push_to_replica(obc
, soid
, peer
,
2391 &(h
->pushes
[peer
].back()), cache
);
2393 // Back out all failed reads
2394 for (auto k
: shards
) {
2395 pg_shard_t p
= k
->first
;
2396 dout(10) << __func__
<< " clean up peer " << p
<< dendl
;
2397 h
->pushes
[p
].pop_back();
2398 if (p
== peer
) break;
2403 return shards
.size();