1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "common/errno.h"
15 #include "ReplicatedBackend.h"
16 #include "messages/MOSDOp.h"
17 #include "messages/MOSDRepOp.h"
18 #include "messages/MOSDRepOpReply.h"
19 #include "messages/MOSDPGPush.h"
20 #include "messages/MOSDPGPull.h"
21 #include "messages/MOSDPGPushReply.h"
22 #include "common/EventTrace.h"
23 #include "include/random.h"
24 #include "include/util.h"
26 #include "osd_tracer.h"
28 #define dout_context cct
29 #define dout_subsys ceph_subsys_osd
30 #define DOUT_PREFIX_ARGS this
32 #define dout_prefix _prefix(_dout, this)
33 static ostream
& _prefix(std::ostream
*_dout
, ReplicatedBackend
*pgb
) {
34 return pgb
->get_parent()->gen_dbg_prefix(*_dout
);
41 using std::ostringstream
;
45 using std::unique_ptr
;
48 using ceph::bufferhash
;
49 using ceph::bufferlist
;
54 class PG_SendMessageOnConn
: public Context
{
55 PGBackend::Listener
*pg
;
60 PGBackend::Listener
*pg
,
62 ConnectionRef conn
) : pg(pg
), reply(reply
), conn(conn
) {}
63 void finish(int) override
{
64 pg
->send_message_osd_cluster(MessageRef(reply
, false), conn
.get());
68 class PG_RecoveryQueueAsync
: public Context
{
69 PGBackend::Listener
*pg
;
70 unique_ptr
<GenContext
<ThreadPool::TPHandle
&>> c
;
73 PG_RecoveryQueueAsync(
74 PGBackend::Listener
*pg
,
75 GenContext
<ThreadPool::TPHandle
&> *c
,
76 uint64_t cost
) : pg(pg
), c(c
), cost(cost
) {}
77 void finish(int) override
{
78 pg
->schedule_recovery_work(c
.release(), cost
);
83 struct ReplicatedBackend::C_OSD_RepModifyCommit
: public Context
{
84 ReplicatedBackend
*pg
;
86 C_OSD_RepModifyCommit(ReplicatedBackend
*pg
, RepModifyRef r
)
88 void finish(int r
) override
{
93 static void log_subop_stats(
95 OpRequestRef op
, int subop
)
97 utime_t latency
= ceph_clock_now();
98 latency
-= op
->get_req()->get_recv_stamp();
101 logger
->inc(l_osd_sop
);
102 logger
->tinc(l_osd_sop_lat
, latency
);
105 if (subop
!= l_osd_sop_pull
) {
106 uint64_t inb
= op
->get_req()->get_data().length();
107 logger
->inc(l_osd_sop_inb
, inb
);
108 if (subop
== l_osd_sop_w
) {
109 logger
->inc(l_osd_sop_w_inb
, inb
);
110 logger
->tinc(l_osd_sop_w_lat
, latency
);
111 } else if (subop
== l_osd_sop_push
) {
112 logger
->inc(l_osd_sop_push_inb
, inb
);
113 logger
->tinc(l_osd_sop_push_lat
, latency
);
115 ceph_abort_msg("no support subop");
117 logger
->tinc(l_osd_sop_pull_lat
, latency
);
121 ReplicatedBackend::ReplicatedBackend(
122 PGBackend::Listener
*pg
,
124 ObjectStore::CollectionHandle
&c
,
127 PGBackend(cct
, pg
, store
, coll
, c
) {}
129 void ReplicatedBackend::run_recovery_op(
130 PGBackend::RecoveryHandle
*_h
,
133 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
134 send_pushes(priority
, h
->pushes
);
135 send_pulls(priority
, h
->pulls
);
136 send_recovery_deletes(priority
, h
->deletes
);
140 int ReplicatedBackend::recover_object(
141 const hobject_t
&hoid
,
143 ObjectContextRef head
,
144 ObjectContextRef obc
,
148 dout(10) << __func__
<< ": " << hoid
<< dendl
;
149 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
150 if (get_parent()->get_local_missing().is_missing(hoid
)) {
160 int started
= start_pushes(
165 pushing
[hoid
].clear();
172 void ReplicatedBackend::check_recovery_sources(const OSDMapRef
& osdmap
)
174 for(map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= pull_from_peer
.begin();
175 i
!= pull_from_peer
.end();
177 if (osdmap
->is_down(i
->first
.osd
)) {
178 dout(10) << "check_recovery_sources resetting pulls from osd." << i
->first
179 << ", osdmap has it marked down" << dendl
;
180 for (set
<hobject_t
>::iterator j
= i
->second
.begin();
181 j
!= i
->second
.end();
183 get_parent()->cancel_pull(*j
);
184 clear_pull(pulling
.find(*j
), false);
186 pull_from_peer
.erase(i
++);
193 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op
)
195 dout(10) << __func__
<< ": " << *op
->get_req() << dendl
;
196 switch (op
->get_req()->get_type()) {
197 case MSG_OSD_PG_PULL
:
204 bool ReplicatedBackend::_handle_message(
208 dout(10) << __func__
<< ": " << *op
->get_req() << dendl
;
209 switch (op
->get_req()->get_type()) {
210 case MSG_OSD_PG_PUSH
:
214 case MSG_OSD_PG_PULL
:
218 case MSG_OSD_PG_PUSH_REPLY
:
222 case MSG_OSD_REPOP
: {
227 case MSG_OSD_REPOPREPLY
: {
238 void ReplicatedBackend::clear_recovery_state()
240 // clear pushing/pulling maps
241 for (auto &&i
: pushing
) {
242 for (auto &&j
: i
.second
) {
243 get_parent()->release_locks(j
.second
.lock_manager
);
248 for (auto &&i
: pulling
) {
249 get_parent()->release_locks(i
.second
.lock_manager
);
252 pull_from_peer
.clear();
255 void ReplicatedBackend::on_change()
257 dout(10) << __func__
<< dendl
;
258 for (auto& op
: in_progress_ops
) {
259 delete op
.second
->on_commit
;
260 op
.second
->on_commit
= nullptr;
262 in_progress_ops
.clear();
263 clear_recovery_state();
266 int ReplicatedBackend::objects_read_sync(
267 const hobject_t
&hoid
,
273 return store
->read(ch
, ghobject_t(hoid
), off
, len
, *bl
, op_flags
);
276 int ReplicatedBackend::objects_readv_sync(
277 const hobject_t
&hoid
,
278 map
<uint64_t, uint64_t>&& m
,
282 interval_set
<uint64_t> im(std::move(m
));
283 auto r
= store
->readv(ch
, ghobject_t(hoid
), im
, *bl
, op_flags
);
285 m
= std::move(im
).detach();
290 void ReplicatedBackend::objects_read_async(
291 const hobject_t
&hoid
,
292 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
293 pair
<bufferlist
*, Context
*> > > &to_read
,
294 Context
*on_complete
,
297 ceph_abort_msg("async read is not used by replica pool");
300 class C_OSD_OnOpCommit
: public Context
{
301 ReplicatedBackend
*pg
;
302 ceph::ref_t
<ReplicatedBackend::InProgressOp
> op
;
304 C_OSD_OnOpCommit(ReplicatedBackend
*pg
, ceph::ref_t
<ReplicatedBackend::InProgressOp
> op
)
305 : pg(pg
), op(std::move(op
)) {}
306 void finish(int) override
{
311 void generate_transaction(
312 PGTransactionUPtr
&pgt
,
314 vector
<pg_log_entry_t
> &log_entries
,
315 ObjectStore::Transaction
*t
,
316 set
<hobject_t
> *added
,
317 set
<hobject_t
> *removed
,
318 const ceph_release_t require_osd_release
= ceph_release_t::unknown
)
322 ceph_assert(removed
);
324 for (auto &&le
: log_entries
) {
325 le
.mark_unrollbackable();
326 auto oiter
= pgt
->op_map
.find(le
.soid
);
327 if (oiter
!= pgt
->op_map
.end() && oiter
->second
.updated_snaps
) {
328 bufferlist
bl(oiter
->second
.updated_snaps
->second
.size() * 8 + 8);
329 encode(oiter
->second
.updated_snaps
->second
, bl
);
331 le
.snaps
.reassign_to_mempool(mempool::mempool_osd_pglog
);
335 pgt
->safe_create_traverse(
336 [&](pair
<const hobject_t
, PGTransaction::ObjectOperation
> &obj_op
) {
337 const hobject_t
&oid
= obj_op
.first
;
338 const ghobject_t goid
=
339 ghobject_t(oid
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
);
340 const PGTransaction::ObjectOperation
&op
= obj_op
.second
;
343 if (op
.is_fresh_object()) {
345 } else if (op
.is_delete()) {
346 removed
->insert(oid
);
350 if (op
.delete_first
) {
351 t
->remove(coll
, goid
);
356 [&](const PGTransaction::ObjectOperation::Init::None
&) {
358 [&](const PGTransaction::ObjectOperation::Init::Create
&op
) {
359 if (require_osd_release
>= ceph_release_t::octopus
) {
360 t
->create(coll
, goid
);
362 t
->touch(coll
, goid
);
365 [&](const PGTransaction::ObjectOperation::Init::Clone
&op
) {
369 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
372 [&](const PGTransaction::ObjectOperation::Init::Rename
&op
) {
373 ceph_assert(op
.source
.is_temp());
374 t
->collection_move_rename(
377 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
383 t
->truncate(coll
, goid
, op
.truncate
->first
);
384 if (op
.truncate
->first
!= op
.truncate
->second
)
385 t
->truncate(coll
, goid
, op
.truncate
->second
);
388 if (!op
.attr_updates
.empty()) {
389 map
<string
, bufferlist
, less
<>> attrs
;
390 for (auto &&p
: op
.attr_updates
) {
392 attrs
[p
.first
] = *(p
.second
);
394 t
->rmattr(coll
, goid
, p
.first
);
396 t
->setattrs(coll
, goid
, attrs
);
400 t
->omap_clear(coll
, goid
);
402 t
->omap_setheader(coll
, goid
, *(op
.omap_header
));
404 for (auto &&up
: op
.omap_updates
) {
405 using UpdateType
= PGTransaction::ObjectOperation::OmapUpdateType
;
407 case UpdateType::Remove
:
408 t
->omap_rmkeys(coll
, goid
, up
.second
);
410 case UpdateType::Insert
:
411 t
->omap_setkeys(coll
, goid
, up
.second
);
413 case UpdateType::RemoveRange
:
414 t
->omap_rmkeyrange(coll
, goid
, up
.second
);
419 // updated_snaps doesn't matter since we marked unrollbackable
422 auto &hint
= *(op
.alloc_hint
);
426 hint
.expected_object_size
,
427 hint
.expected_write_size
,
431 for (auto &&extent
: op
.buffer_updates
) {
432 using BufferUpdate
= PGTransaction::ObjectOperation::BufferUpdate
;
435 [&](const BufferUpdate::Write
&op
) {
444 [&](const BufferUpdate::Zero
&op
) {
451 [&](const BufferUpdate::CloneRange
&op
) {
452 ceph_assert(op
.len
== extent
.get_len());
455 ghobject_t(op
.from
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
465 void ReplicatedBackend::submit_transaction(
466 const hobject_t
&soid
,
467 const object_stat_sum_t
&delta_stats
,
468 const eversion_t
&at_version
,
469 PGTransactionUPtr
&&_t
,
470 const eversion_t
&trim_to
,
471 const eversion_t
&min_last_complete_ondisk
,
472 vector
<pg_log_entry_t
>&& _log_entries
,
473 std::optional
<pg_hit_set_history_t
> &hset_history
,
474 Context
*on_all_commit
,
477 OpRequestRef orig_op
)
483 vector
<pg_log_entry_t
> log_entries(_log_entries
);
484 ObjectStore::Transaction op_t
;
485 PGTransactionUPtr
t(std::move(_t
));
486 set
<hobject_t
> added
, removed
;
487 generate_transaction(
494 get_osdmap()->require_osd_release
);
495 ceph_assert(added
.size() <= 1);
496 ceph_assert(removed
.size() <= 1);
498 auto insert_res
= in_progress_ops
.insert(
501 ceph::make_ref
<InProgressOp
>(
506 ceph_assert(insert_res
.second
);
507 InProgressOp
&op
= *insert_res
.first
->second
;
510 op
.waiting_for_commit
.insert(
511 parent
->get_acting_recovery_backfill_shards().begin(),
512 parent
->get_acting_recovery_backfill_shards().end());
520 min_last_complete_ondisk
,
521 added
.size() ? *(added
.begin()) : hobject_t(),
522 removed
.size() ? *(removed
.begin()) : hobject_t(),
528 add_temp_objs(added
);
529 clear_temp_objs(removed
);
531 parent
->log_operation(
532 std::move(log_entries
),
536 min_last_complete_ondisk
,
540 op_t
.register_on_commit(
541 parent
->bless_context(
542 new C_OSD_OnOpCommit(this, &op
)));
544 vector
<ObjectStore::Transaction
> tls
;
545 tls
.push_back(std::move(op_t
));
547 parent
->queue_transactions(tls
, op
.op
);
548 if (at_version
!= eversion_t()) {
549 parent
->op_applied(at_version
);
553 void ReplicatedBackend::op_commit(const ceph::ref_t
<InProgressOp
>& op
)
555 if (op
->on_commit
== nullptr) {
561 OID_EVENT_TRACE_WITH_MSG((op
&& op
->op
) ? op
->op
->get_req() : NULL
, "OP_COMMIT_BEGIN", true);
562 dout(10) << __func__
<< ": " << op
->tid
<< dendl
;
564 op
->op
->mark_event("op_commit");
565 op
->op
->pg_trace
.event("op commit");
568 op
->waiting_for_commit
.erase(get_parent()->whoami_shard());
570 if (op
->waiting_for_commit
.empty()) {
571 op
->on_commit
->complete(0);
573 in_progress_ops
.erase(op
->tid
);
577 void ReplicatedBackend::do_repop_reply(OpRequestRef op
)
579 static_cast<MOSDRepOpReply
*>(op
->get_nonconst_req())->finish_decode();
580 auto r
= op
->get_req
<MOSDRepOpReply
>();
581 ceph_assert(r
->get_header().type
== MSG_OSD_REPOPREPLY
);
585 // must be replication.
586 ceph_tid_t rep_tid
= r
->get_tid();
587 pg_shard_t from
= r
->from
;
589 auto iter
= in_progress_ops
.find(rep_tid
);
590 if (iter
!= in_progress_ops
.end()) {
591 InProgressOp
&ip_op
= *iter
->second
;
592 const MOSDOp
*m
= nullptr;
594 m
= ip_op
.op
->get_req
<MOSDOp
>();
597 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " op " //<< *m
598 << " ack_type " << (int)r
->ack_type
602 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " (no op) "
603 << " ack_type " << (int)r
->ack_type
609 if (r
->ack_type
& CEPH_OSD_FLAG_ONDISK
) {
610 ceph_assert(ip_op
.waiting_for_commit
.count(from
));
611 ip_op
.waiting_for_commit
.erase(from
);
613 ip_op
.op
->mark_event("sub_op_commit_rec");
614 ip_op
.op
->pg_trace
.event("sub_op_commit_rec");
617 // legacy peer; ignore
620 parent
->update_peer_last_complete_ondisk(
622 r
->get_last_complete_ondisk());
624 if (ip_op
.waiting_for_commit
.empty() &&
626 ip_op
.on_commit
->complete(0);
628 in_progress_ops
.erase(iter
);
633 int ReplicatedBackend::be_deep_scrub(
634 const hobject_t
&poid
,
636 ScrubMapBuilder
&pos
,
639 dout(10) << __func__
<< " " << poid
<< " pos " << pos
<< dendl
;
641 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
|
642 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
|
643 CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE
;
646 sleeptime
.set_from_double(cct
->_conf
->osd_debug_deep_scrub_sleep
);
647 if (sleeptime
!= utime_t()) {
648 lgeneric_derr(cct
) << __func__
<< " sleeping for " << sleeptime
<< dendl
;
652 ceph_assert(poid
== pos
.ls
[pos
.pos
]);
653 if (!pos
.data_done()) {
654 if (pos
.data_pos
== 0) {
655 pos
.data_hash
= bufferhash(-1);
658 const uint64_t stride
= cct
->_conf
->osd_deep_scrub_stride
;
664 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
669 dout(20) << __func__
<< " " << poid
<< " got "
670 << r
<< " on read, read_error" << dendl
;
678 if (static_cast<uint64_t>(r
) == stride
) {
679 dout(20) << __func__
<< " " << poid
<< " more data, digest so far 0x"
680 << std::hex
<< pos
.data_hash
.digest() << std::dec
<< dendl
;
685 o
.digest
= pos
.data_hash
.digest();
686 o
.digest_present
= true;
687 dout(20) << __func__
<< " " << poid
<< " done with data, digest 0x"
688 << std::hex
<< o
.digest
<< std::dec
<< dendl
;
692 if (pos
.omap_pos
.empty()) {
693 pos
.omap_hash
= bufferhash(-1);
696 r
= store
->omap_get_header(
699 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
702 dout(20) << __func__
<< " " << poid
<< " got "
703 << r
<< " on omap header read, read_error" << dendl
;
707 if (r
== 0 && hdrbl
.length()) {
708 bool encoded
= false;
709 dout(25) << "CRC header " << cleanbin(hdrbl
, encoded
, true) << dendl
;
710 pos
.omap_hash
<< hdrbl
;
715 ObjectMap::ObjectMapIterator iter
= store
->get_omap_iterator(
718 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
720 if (pos
.omap_pos
.length()) {
721 iter
->lower_bound(pos
.omap_pos
);
723 iter
->seek_to_first();
725 int max
= g_conf()->osd_deep_scrub_keys
;
726 while (iter
->status() == 0 && iter
->valid()) {
727 pos
.omap_bytes
+= iter
->value().length();
730 // fixme: we can do this more efficiently.
732 encode(iter
->key(), bl
);
733 encode(iter
->value(), bl
);
738 if (iter
->valid() && max
== 0) {
739 pos
.omap_pos
= iter
->key();
742 if (iter
->status() < 0) {
743 dout(25) << __func__
<< " " << poid
744 << " on omap scan, db status error" << dendl
;
750 if (pos
.omap_keys
> cct
->_conf
->
751 osd_deep_scrub_large_omap_object_key_threshold
||
752 pos
.omap_bytes
> cct
->_conf
->
753 osd_deep_scrub_large_omap_object_value_sum_threshold
) {
754 dout(25) << __func__
<< " " << poid
755 << " large omap object detected. Object has " << pos
.omap_keys
756 << " keys and size " << pos
.omap_bytes
<< " bytes" << dendl
;
757 o
.large_omap_object_found
= true;
758 o
.large_omap_object_key_count
= pos
.omap_keys
;
759 o
.large_omap_object_value_size
= pos
.omap_bytes
;
760 map
.has_large_omap_object_errors
= true;
763 o
.omap_digest
= pos
.omap_hash
.digest();
764 o
.omap_digest_present
= true;
765 dout(20) << __func__
<< " done with " << poid
<< " omap_digest "
766 << std::hex
<< o
.omap_digest
<< std::dec
<< dendl
;
769 if (pos
.omap_keys
> 0 || pos
.omap_bytes
> 0) {
770 dout(25) << __func__
<< " adding " << pos
.omap_keys
<< " keys and "
771 << pos
.omap_bytes
<< " bytes to pg_stats sums" << dendl
;
772 map
.has_omap_keys
= true;
773 o
.object_omap_bytes
= pos
.omap_bytes
;
774 o
.object_omap_keys
= pos
.omap_keys
;
781 void ReplicatedBackend::_do_push(OpRequestRef op
)
783 auto m
= op
->get_req
<MOSDPGPush
>();
784 ceph_assert(m
->get_type() == MSG_OSD_PG_PUSH
);
785 pg_shard_t from
= m
->from
;
789 vector
<PushReplyOp
> replies
;
790 ObjectStore::Transaction t
;
791 if (get_parent()->check_failsafe_full()) {
792 dout(10) << __func__
<< " Out of space (failsafe) processing push request." << dendl
;
795 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
796 i
!= m
->pushes
.end();
798 replies
.push_back(PushReplyOp());
799 handle_push(from
, *i
, &(replies
.back()), &t
, m
->is_repair
);
802 MOSDPGPushReply
*reply
= new MOSDPGPushReply
;
803 reply
->from
= get_parent()->whoami_shard();
804 reply
->set_priority(m
->get_priority());
805 reply
->pgid
= get_info().pgid
;
806 reply
->map_epoch
= m
->map_epoch
;
807 reply
->min_epoch
= m
->min_epoch
;
808 reply
->replies
.swap(replies
);
809 reply
->compute_cost(cct
);
811 t
.register_on_complete(
812 new PG_SendMessageOnConn(
813 get_parent(), reply
, m
->get_connection()));
815 get_parent()->queue_transaction(std::move(t
));
818 struct C_ReplicatedBackend_OnPullComplete
: GenContext
<ThreadPool::TPHandle
&> {
819 ReplicatedBackend
*bc
;
820 list
<ReplicatedBackend::pull_complete_info
> to_continue
;
822 C_ReplicatedBackend_OnPullComplete(
823 ReplicatedBackend
*bc
,
825 list
<ReplicatedBackend::pull_complete_info
> &&to_continue
)
826 : bc(bc
), to_continue(std::move(to_continue
)), priority(priority
) {}
828 void finish(ThreadPool::TPHandle
&handle
) override
{
829 ReplicatedBackend::RPGHandle
*h
= bc
->_open_recovery_op();
830 for (auto &&i
: to_continue
) {
831 auto j
= bc
->pulling
.find(i
.hoid
);
832 ceph_assert(j
!= bc
->pulling
.end());
833 ObjectContextRef obc
= j
->second
.obc
;
834 bc
->clear_pull(j
, false /* already did it */);
835 int started
= bc
->start_pushes(i
.hoid
, obc
, h
);
837 bc
->pushing
[i
.hoid
].clear();
838 bc
->get_parent()->on_failed_pull(
839 { bc
->get_parent()->whoami_shard() },
840 i
.hoid
, obc
->obs
.oi
.version
);
841 } else if (!started
) {
842 bc
->get_parent()->on_global_recover(
843 i
.hoid
, i
.stat
, false);
845 handle
.reset_tp_timeout();
847 bc
->run_recovery_op(h
, priority
);
850 /// Estimate total data reads required to perform pushes
851 uint64_t estimate_push_costs() const {
853 for (const auto &i
: to_continue
) {
854 cost
+= i
.stat
.num_bytes_recovered
;
860 void ReplicatedBackend::_do_pull_response(OpRequestRef op
)
862 auto m
= op
->get_req
<MOSDPGPush
>();
863 ceph_assert(m
->get_type() == MSG_OSD_PG_PUSH
);
864 pg_shard_t from
= m
->from
;
868 vector
<PullOp
> replies(1);
869 if (get_parent()->check_failsafe_full()) {
870 dout(10) << __func__
<< " Out of space (failsafe) processing pull response (push)." << dendl
;
874 ObjectStore::Transaction t
;
875 list
<pull_complete_info
> to_continue
;
876 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
877 i
!= m
->pushes
.end();
879 bool more
= handle_pull_response(from
, *i
, &(replies
.back()), &to_continue
, &t
);
881 replies
.push_back(PullOp());
883 if (!to_continue
.empty()) {
884 C_ReplicatedBackend_OnPullComplete
*c
=
885 new C_ReplicatedBackend_OnPullComplete(
888 std::move(to_continue
));
889 t
.register_on_complete(
890 new PG_RecoveryQueueAsync(
892 get_parent()->bless_unlocked_gencontext(c
),
893 std::max
<uint64_t>(1, c
->estimate_push_costs())));
895 replies
.erase(replies
.end() - 1);
897 if (replies
.size()) {
898 MOSDPGPull
*reply
= new MOSDPGPull
;
899 reply
->from
= parent
->whoami_shard();
900 reply
->set_priority(m
->get_priority());
901 reply
->pgid
= get_info().pgid
;
902 reply
->map_epoch
= m
->map_epoch
;
903 reply
->min_epoch
= m
->min_epoch
;
904 reply
->set_pulls(std::move(replies
));
905 reply
->compute_cost(cct
);
907 t
.register_on_complete(
908 new PG_SendMessageOnConn(
909 get_parent(), reply
, m
->get_connection()));
912 get_parent()->queue_transaction(std::move(t
));
915 void ReplicatedBackend::do_pull(OpRequestRef op
)
917 MOSDPGPull
*m
= static_cast<MOSDPGPull
*>(op
->get_nonconst_req());
918 ceph_assert(m
->get_type() == MSG_OSD_PG_PULL
);
919 pg_shard_t from
= m
->from
;
921 map
<pg_shard_t
, vector
<PushOp
> > replies
;
922 for (auto& i
: m
->take_pulls()) {
923 replies
[from
].push_back(PushOp());
924 handle_pull(from
, i
, &(replies
[from
].back()));
926 send_pushes(m
->get_priority(), replies
);
929 void ReplicatedBackend::do_push_reply(OpRequestRef op
)
931 auto m
= op
->get_req
<MOSDPGPushReply
>();
932 ceph_assert(m
->get_type() == MSG_OSD_PG_PUSH_REPLY
);
933 pg_shard_t from
= m
->from
;
935 vector
<PushOp
> replies(1);
936 for (vector
<PushReplyOp
>::const_iterator i
= m
->replies
.begin();
937 i
!= m
->replies
.end();
939 bool more
= handle_push_reply(from
, *i
, &(replies
.back()));
941 replies
.push_back(PushOp());
943 replies
.erase(replies
.end() - 1);
945 map
<pg_shard_t
, vector
<PushOp
> > _replies
;
946 _replies
[from
].swap(replies
);
947 send_pushes(m
->get_priority(), _replies
);
950 Message
* ReplicatedBackend::generate_subop(
951 const hobject_t
&soid
,
952 const eversion_t
&at_version
,
955 eversion_t pg_trim_to
,
956 eversion_t min_last_complete_ondisk
,
957 hobject_t new_temp_oid
,
958 hobject_t discard_temp_oid
,
959 const bufferlist
&log_entries
,
960 std::optional
<pg_hit_set_history_t
> &hset_hist
,
961 ObjectStore::Transaction
&op_t
,
963 const pg_info_t
&pinfo
)
965 int acks_wanted
= CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
;
966 // forward the write/update/whatever
967 MOSDRepOp
*wr
= new MOSDRepOp(
968 reqid
, parent
->whoami_shard(),
969 spg_t(get_info().pgid
.pgid
, peer
.shard
),
972 parent
->get_last_peering_reset_epoch(),
975 // ship resulting transaction, log entries, and pg_stats
976 if (!parent
->should_send_op(peer
, soid
)) {
977 ObjectStore::Transaction t
;
978 encode(t
, wr
->get_data());
980 encode(op_t
, wr
->get_data());
981 wr
->get_header().data_off
= op_t
.get_data_alignment();
984 wr
->logbl
= log_entries
;
986 if (pinfo
.is_incomplete())
987 wr
->pg_stats
= pinfo
.stats
; // reflects backfill progress
989 wr
->pg_stats
= get_info().stats
;
991 wr
->pg_trim_to
= pg_trim_to
;
993 if (HAVE_FEATURE(parent
->min_peer_features(), OSD_REPOP_MLCOD
)) {
994 wr
->min_last_complete_ondisk
= min_last_complete_ondisk
;
996 /* Some replicas need this field to be at_version. New replicas
998 wr
->set_rollback_to(at_version
);
1001 wr
->new_temp_oid
= new_temp_oid
;
1002 wr
->discard_temp_oid
= discard_temp_oid
;
1003 wr
->updated_hit_set_history
= hset_hist
;
1007 void ReplicatedBackend::issue_op(
1008 const hobject_t
&soid
,
1009 const eversion_t
&at_version
,
1012 eversion_t pg_trim_to
,
1013 eversion_t min_last_complete_ondisk
,
1014 hobject_t new_temp_oid
,
1015 hobject_t discard_temp_oid
,
1016 const vector
<pg_log_entry_t
> &log_entries
,
1017 std::optional
<pg_hit_set_history_t
> &hset_hist
,
1019 ObjectStore::Transaction
&op_t
)
1021 if (parent
->get_acting_recovery_backfill_shards().size() > 1) {
1023 op
->op
->pg_trace
.event("issue replication ops");
1025 set
<pg_shard_t
> replicas
= parent
->get_acting_recovery_backfill_shards();
1026 replicas
.erase(parent
->whoami_shard());
1027 ss
<< "waiting for subops from " << replicas
;
1028 op
->op
->mark_sub_op_sent(ss
.str());
1031 // avoid doing the same work in generate_subop
1033 encode(log_entries
, logs
);
1035 for (const auto& shard
: get_parent()->get_acting_recovery_backfill_shards()) {
1036 if (shard
== parent
->whoami_shard()) continue;
1037 const pg_info_t
&pinfo
= parent
->get_shard_info().find(shard
)->second
;
1040 wr
= generate_subop(
1046 min_last_complete_ondisk
,
1054 if (op
->op
&& op
->op
->pg_trace
)
1055 wr
->trace
.init("replicated op", nullptr, &op
->op
->pg_trace
);
1056 get_parent()->send_message_osd_cluster(
1057 shard
.osd
, wr
, get_osdmap_epoch());
1063 void ReplicatedBackend::do_repop(OpRequestRef op
)
1065 static_cast<MOSDRepOp
*>(op
->get_nonconst_req())->finish_decode();
1066 auto m
= op
->get_req
<MOSDRepOp
>();
1067 int msg_type
= m
->get_type();
1068 ceph_assert(MSG_OSD_REPOP
== msg_type
);
1070 const hobject_t
& soid
= m
->poid
;
1072 dout(10) << __func__
<< " " << soid
1073 << " v " << m
->version
1074 << (m
->logbl
.length() ? " (transaction)" : " (parallel exec")
1075 << " " << m
->logbl
.length()
1080 ceph_assert(m
->map_epoch
>= get_info().history
.same_interval_since
);
1082 dout(30) << __func__
<< " missing before " << get_parent()->get_log().get_missing().get_items() << dendl
;
1083 parent
->maybe_preempt_replica_scrub(soid
);
1085 int ackerosd
= m
->get_source().num();
1089 RepModifyRef
rm(std::make_shared
<RepModify
>());
1091 rm
->ackerosd
= ackerosd
;
1092 rm
->last_complete
= get_info().last_complete
;
1093 rm
->epoch_started
= get_osdmap_epoch();
1095 ceph_assert(m
->logbl
.length());
1096 // shipped transaction and log entries
1097 vector
<pg_log_entry_t
> log
;
1099 auto p
= const_cast<bufferlist
&>(m
->get_data()).cbegin();
1102 if (m
->new_temp_oid
!= hobject_t()) {
1103 dout(20) << __func__
<< " start tracking temp " << m
->new_temp_oid
<< dendl
;
1104 add_temp_obj(m
->new_temp_oid
);
1106 if (m
->discard_temp_oid
!= hobject_t()) {
1107 dout(20) << __func__
<< " stop tracking temp " << m
->discard_temp_oid
<< dendl
;
1108 if (rm
->opt
.empty()) {
1109 dout(10) << __func__
<< ": removing object " << m
->discard_temp_oid
1110 << " since we won't get the transaction" << dendl
;
1111 rm
->localt
.remove(coll
, ghobject_t(m
->discard_temp_oid
));
1113 clear_temp_obj(m
->discard_temp_oid
);
1116 p
= const_cast<bufferlist
&>(m
->logbl
).begin();
1118 rm
->opt
.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
1120 bool update_snaps
= false;
1121 if (!rm
->opt
.empty()) {
1122 // If the opt is non-empty, we infer we are before
1123 // last_backfill (according to the primary, not our
1124 // not-quite-accurate value), and should update the
1125 // collections now. Otherwise, we do it later on push.
1126 update_snaps
= true;
1129 // flag set to true during async recovery
1131 pg_missing_tracker_t pmissing
= get_parent()->get_local_missing();
1132 if (pmissing
.is_missing(soid
)) {
1134 dout(30) << __func__
<< " is_missing " << pmissing
.is_missing(soid
) << dendl
;
1135 for (auto &&e
: log
) {
1136 dout(30) << " add_next_event entry " << e
<< dendl
;
1137 get_parent()->add_local_next_event(e
);
1138 dout(30) << " entry is_delete " << e
.is_delete() << dendl
;
1142 parent
->update_stats(m
->pg_stats
);
1143 parent
->log_operation(
1145 m
->updated_hit_set_history
,
1147 m
->version
, /* Replicated PGs don't have rollback info */
1148 m
->min_last_complete_ondisk
,
1153 rm
->opt
.register_on_commit(
1154 parent
->bless_context(
1155 new C_OSD_RepModifyCommit(this, rm
)));
1156 vector
<ObjectStore::Transaction
> tls
;
1158 tls
.push_back(std::move(rm
->localt
));
1159 tls
.push_back(std::move(rm
->opt
));
1160 parent
->queue_transactions(tls
, op
);
1161 // op is cleaned up by oncommit/onapply when both are executed
1162 dout(30) << __func__
<< " missing after" << get_parent()->get_log().get_missing().get_items() << dendl
;
1165 void ReplicatedBackend::repop_commit(RepModifyRef rm
)
1167 rm
->op
->mark_commit_sent();
1168 rm
->op
->pg_trace
.event("sup_op_commit");
1169 rm
->committed
= true;
1172 auto m
= rm
->op
->get_req
<MOSDRepOp
>();
1173 ceph_assert(m
->get_type() == MSG_OSD_REPOP
);
1174 dout(10) << __func__
<< " on op " << *m
1175 << ", sending commit to osd." << rm
->ackerosd
1177 ceph_assert(get_osdmap()->is_up(rm
->ackerosd
));
1179 get_parent()->update_last_complete_ondisk(rm
->last_complete
);
1181 MOSDRepOpReply
*reply
= new MOSDRepOpReply(
1183 get_parent()->whoami_shard(),
1184 0, get_osdmap_epoch(), m
->get_min_epoch(), CEPH_OSD_FLAG_ONDISK
);
1185 reply
->set_last_complete_ondisk(rm
->last_complete
);
1186 reply
->set_priority(CEPH_MSG_PRIO_HIGH
); // this better match ack priority!
1187 reply
->trace
= rm
->op
->pg_trace
;
1188 get_parent()->send_message_osd_cluster(
1189 rm
->ackerosd
, reply
, get_osdmap_epoch());
1191 log_subop_stats(get_parent()->get_logger(), rm
->op
, l_osd_sop_w
);
1195 // ===========================================================
1197 void ReplicatedBackend::calc_head_subsets(
1198 ObjectContextRef obc
, SnapSet
& snapset
, const hobject_t
& head
,
1199 const pg_missing_t
& missing
,
1200 const hobject_t
&last_backfill
,
1201 interval_set
<uint64_t>& data_subset
,
1202 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1203 ObcLockManager
&manager
)
1205 dout(10) << "calc_head_subsets " << head
1206 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1208 uint64_t size
= obc
->obs
.oi
.size
;
1210 data_subset
.insert(0, size
);
1212 assert(HAVE_FEATURE(parent
->min_peer_features(), SERVER_OCTOPUS
));
1213 const auto it
= missing
.get_items().find(head
);
1214 assert(it
!= missing
.get_items().end());
1215 data_subset
.intersection_of(it
->second
.clean_regions
.get_dirty_regions());
1216 dout(10) << "calc_head_subsets " << head
1217 << " data_subset " << data_subset
<< dendl
;
1219 if (get_parent()->get_pool().allow_incomplete_clones()) {
1220 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1224 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1225 dout(10) << "calc_head_subsets " << head
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1230 interval_set
<uint64_t> cloning
;
1231 interval_set
<uint64_t> prev
;
1234 prev
.insert(0, size
);
1236 for (int j
=snapset
.clones
.size()-1; j
>=0; j
--) {
1237 c
.snap
= snapset
.clones
[j
];
1238 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1239 if (!missing
.is_missing(c
) &&
1240 c
< last_backfill
&&
1241 get_parent()->try_lock_for_read(c
, manager
)) {
1242 dout(10) << "calc_head_subsets " << head
<< " has prev " << c
1243 << " overlap " << prev
<< dendl
;
1247 dout(10) << "calc_head_subsets " << head
<< " does not have prev " << c
1248 << " overlap " << prev
<< dendl
;
1251 cloning
.intersection_of(data_subset
);
1252 if (cloning
.empty()) {
1253 dout(10) << "skipping clone, nothing needs to clone" << dendl
;
1257 if (cloning
.num_intervals() > g_conf().get_val
<uint64_t>("osd_recover_clone_overlap_limit")) {
1258 dout(10) << "skipping clone, too many holes" << dendl
;
1259 get_parent()->release_locks(manager
);
1260 clone_subsets
.clear();
1265 // what's left for us to push?
1266 clone_subsets
[c
] = cloning
;
1267 data_subset
.subtract(cloning
);
1269 dout(10) << "calc_head_subsets " << head
1270 << " data_subset " << data_subset
1271 << " clone_subsets " << clone_subsets
<< dendl
;
1274 void ReplicatedBackend::calc_clone_subsets(
1275 SnapSet
& snapset
, const hobject_t
& soid
,
1276 const pg_missing_t
& missing
,
1277 const hobject_t
&last_backfill
,
1278 interval_set
<uint64_t>& data_subset
,
1279 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1280 ObcLockManager
&manager
)
1282 dout(10) << "calc_clone_subsets " << soid
1283 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1285 uint64_t size
= snapset
.clone_size
[soid
.snap
];
1287 data_subset
.insert(0, size
);
1289 if (get_parent()->get_pool().allow_incomplete_clones()) {
1290 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1294 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1295 dout(10) << "calc_clone_subsets " << soid
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1300 for (i
=0; i
< snapset
.clones
.size(); i
++)
1301 if (snapset
.clones
[i
] == soid
.snap
)
1304 // any overlap with next older clone?
1305 interval_set
<uint64_t> cloning
;
1306 interval_set
<uint64_t> prev
;
1308 prev
.insert(0, size
);
1309 for (int j
=i
-1; j
>=0; j
--) {
1311 c
.snap
= snapset
.clones
[j
];
1312 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1313 if (!missing
.is_missing(c
) &&
1314 c
< last_backfill
&&
1315 get_parent()->try_lock_for_read(c
, manager
)) {
1316 dout(10) << "calc_clone_subsets " << soid
<< " has prev " << c
1317 << " overlap " << prev
<< dendl
;
1318 clone_subsets
[c
] = prev
;
1319 cloning
.union_of(prev
);
1322 dout(10) << "calc_clone_subsets " << soid
<< " does not have prev " << c
1323 << " overlap " << prev
<< dendl
;
1326 // overlap with next newest?
1327 interval_set
<uint64_t> next
;
1329 next
.insert(0, size
);
1330 for (unsigned j
=i
+1; j
<snapset
.clones
.size(); j
++) {
1332 c
.snap
= snapset
.clones
[j
];
1333 next
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
-1]]);
1334 if (!missing
.is_missing(c
) &&
1335 c
< last_backfill
&&
1336 get_parent()->try_lock_for_read(c
, manager
)) {
1337 dout(10) << "calc_clone_subsets " << soid
<< " has next " << c
1338 << " overlap " << next
<< dendl
;
1339 clone_subsets
[c
] = next
;
1340 cloning
.union_of(next
);
1343 dout(10) << "calc_clone_subsets " << soid
<< " does not have next " << c
1344 << " overlap " << next
<< dendl
;
1347 if (cloning
.num_intervals() > g_conf().get_val
<uint64_t>("osd_recover_clone_overlap_limit")) {
1348 dout(10) << "skipping clone, too many holes" << dendl
;
1349 get_parent()->release_locks(manager
);
1350 clone_subsets
.clear();
1355 // what's left for us to push?
1356 data_subset
.subtract(cloning
);
1358 dout(10) << "calc_clone_subsets " << soid
1359 << " data_subset " << data_subset
1360 << " clone_subsets " << clone_subsets
<< dendl
;
1363 void ReplicatedBackend::prepare_pull(
1365 const hobject_t
& soid
,
1366 ObjectContextRef headctx
,
1369 const auto missing_iter
= get_parent()->get_local_missing().get_items().find(soid
);
1370 ceph_assert(missing_iter
!= get_parent()->get_local_missing().get_items().end());
1371 eversion_t _v
= missing_iter
->second
.need
;
1372 ceph_assert(_v
== v
);
1373 const map
<hobject_t
, set
<pg_shard_t
>> &missing_loc(
1374 get_parent()->get_missing_loc_shards());
1375 const map
<pg_shard_t
, pg_missing_t
> &peer_missing(
1376 get_parent()->get_shard_missing());
1377 map
<hobject_t
, set
<pg_shard_t
>>::const_iterator q
= missing_loc
.find(soid
);
1378 ceph_assert(q
!= missing_loc
.end());
1379 ceph_assert(!q
->second
.empty());
1382 auto p
= q
->second
.end();
1383 if (cct
->_conf
->osd_debug_feed_pullee
>= 0) {
1384 for (auto it
= q
->second
.begin(); it
!= q
->second
.end(); it
++) {
1385 if (it
->osd
== cct
->_conf
->osd_debug_feed_pullee
) {
1391 if (p
== q
->second
.end()) {
1392 // probably because user feed a wrong pullee
1393 p
= q
->second
.begin();
1395 ceph::util::generate_random_number
<int>(0,
1396 q
->second
.size() - 1));
1398 ceph_assert(get_osdmap()->is_up(p
->osd
));
1399 pg_shard_t fromshard
= *p
;
1401 dout(7) << "pull " << soid
1403 << " on osds " << q
->second
1404 << " from osd." << fromshard
1407 ceph_assert(peer_missing
.count(fromshard
));
1408 const pg_missing_t
&pmissing
= peer_missing
.find(fromshard
)->second
;
1409 if (pmissing
.is_missing(soid
, v
)) {
1410 ceph_assert(pmissing
.get_items().find(soid
)->second
.have
!= v
);
1411 dout(10) << "pulling soid " << soid
<< " from osd " << fromshard
1412 << " at version " << pmissing
.get_items().find(soid
)->second
.have
1413 << " rather than at version " << v
<< dendl
;
1414 v
= pmissing
.get_items().find(soid
)->second
.have
;
1415 ceph_assert(get_parent()->get_log().get_log().objects
.count(soid
) &&
1416 (get_parent()->get_log().get_log().objects
.find(soid
)->second
->op
==
1417 pg_log_entry_t::LOST_REVERT
) &&
1418 (get_parent()->get_log().get_log().objects
.find(
1419 soid
)->second
->reverting_to
==
1423 ObjectRecoveryInfo recovery_info
;
1424 ObcLockManager lock_manager
;
1426 if (soid
.is_snap()) {
1427 ceph_assert(!get_parent()->get_local_missing().is_missing(soid
.get_head()));
1428 ceph_assert(headctx
);
1430 SnapSetContext
*ssc
= headctx
->ssc
;
1432 dout(10) << " snapset " << ssc
->snapset
<< dendl
;
1433 recovery_info
.ss
= ssc
->snapset
;
1435 ssc
->snapset
, soid
, get_parent()->get_local_missing(),
1436 get_info().last_backfill
,
1437 recovery_info
.copy_subset
,
1438 recovery_info
.clone_subset
,
1440 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1441 dout(10) << " pulling " << recovery_info
<< dendl
;
1443 ceph_assert(ssc
->snapset
.clone_size
.count(soid
.snap
));
1444 recovery_info
.size
= ssc
->snapset
.clone_size
[soid
.snap
];
1445 recovery_info
.object_exist
= missing_iter
->second
.clean_regions
.object_is_exist();
1447 // pulling head or unversioned object.
1448 // always pull the whole thing.
1449 recovery_info
.copy_subset
.insert(0, (uint64_t)-1);
1450 assert(HAVE_FEATURE(parent
->min_peer_features(), SERVER_OCTOPUS
));
1451 recovery_info
.copy_subset
.intersection_of(missing_iter
->second
.clean_regions
.get_dirty_regions());
1452 recovery_info
.size
= ((uint64_t)-1);
1453 recovery_info
.object_exist
= missing_iter
->second
.clean_regions
.object_is_exist();
1456 h
->pulls
[fromshard
].push_back(PullOp());
1457 PullOp
&op
= h
->pulls
[fromshard
].back();
1460 op
.recovery_info
= recovery_info
;
1461 op
.recovery_info
.soid
= soid
;
1462 op
.recovery_info
.version
= v
;
1463 op
.recovery_progress
.data_complete
= false;
1464 op
.recovery_progress
.omap_complete
= !missing_iter
->second
.clean_regions
.omap_is_dirty();
1465 op
.recovery_progress
.data_recovered_to
= 0;
1466 op
.recovery_progress
.first
= true;
1468 ceph_assert(!pulling
.count(soid
));
1469 pull_from_peer
[fromshard
].insert(soid
);
1470 PullInfo
&pi
= pulling
[soid
];
1471 pi
.from
= fromshard
;
1473 pi
.head_ctx
= headctx
;
1474 pi
.recovery_info
= op
.recovery_info
;
1475 pi
.recovery_progress
= op
.recovery_progress
;
1476 pi
.cache_dont_need
= h
->cache_dont_need
;
1477 pi
.lock_manager
= std::move(lock_manager
);
1481 * intelligently push an object to a replica. make use of existing
1482 * clones/heads and dup data ranges where possible.
1484 int ReplicatedBackend::prep_push_to_replica(
1485 ObjectContextRef obc
, const hobject_t
& soid
, pg_shard_t peer
,
1486 PushOp
*pop
, bool cache_dont_need
)
1488 const object_info_t
& oi
= obc
->obs
.oi
;
1489 uint64_t size
= obc
->obs
.oi
.size
;
1491 dout(10) << __func__
<< ": " << soid
<< " v" << oi
.version
1492 << " size " << size
<< " to osd." << peer
<< dendl
;
1494 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1495 interval_set
<uint64_t> data_subset
;
1497 ObcLockManager lock_manager
;
1498 // are we doing a clone on the replica?
1499 if (soid
.snap
&& soid
.snap
< CEPH_NOSNAP
) {
1500 hobject_t head
= soid
;
1501 head
.snap
= CEPH_NOSNAP
;
1503 // try to base push off of clones that succeed/preceed poid
1504 // we need the head (and current SnapSet) locally to do that.
1505 if (get_parent()->get_local_missing().is_missing(head
)) {
1506 dout(15) << "push_to_replica missing head " << head
<< ", pushing raw clone" << dendl
;
1507 return prep_push(obc
, soid
, peer
, pop
, cache_dont_need
);
1510 SnapSetContext
*ssc
= obc
->ssc
;
1512 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1513 pop
->recovery_info
.ss
= ssc
->snapset
;
1514 map
<pg_shard_t
, pg_missing_t
>::const_iterator pm
=
1515 get_parent()->get_shard_missing().find(peer
);
1516 ceph_assert(pm
!= get_parent()->get_shard_missing().end());
1517 map
<pg_shard_t
, pg_info_t
>::const_iterator pi
=
1518 get_parent()->get_shard_info().find(peer
);
1519 ceph_assert(pi
!= get_parent()->get_shard_info().end());
1523 pi
->second
.last_backfill
,
1524 data_subset
, clone_subsets
,
1526 } else if (soid
.snap
== CEPH_NOSNAP
) {
1527 // pushing head or unversioned object.
1528 // base this on partially on replica's clones?
1529 SnapSetContext
*ssc
= obc
->ssc
;
1531 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1534 ssc
->snapset
, soid
, get_parent()->get_shard_missing().find(peer
)->second
,
1535 get_parent()->get_shard_info().find(peer
)->second
.last_backfill
,
1536 data_subset
, clone_subsets
,
1549 std::move(lock_manager
));
1552 int ReplicatedBackend::prep_push(ObjectContextRef obc
,
1553 const hobject_t
& soid
, pg_shard_t peer
,
1554 PushOp
*pop
, bool cache_dont_need
)
1556 interval_set
<uint64_t> data_subset
;
1557 if (obc
->obs
.oi
.size
)
1558 data_subset
.insert(0, obc
->obs
.oi
.size
);
1559 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1561 return prep_push(obc
, soid
, peer
,
1562 obc
->obs
.oi
.version
, data_subset
, clone_subsets
,
1563 pop
, cache_dont_need
, ObcLockManager());
1566 int ReplicatedBackend::prep_push(
1567 ObjectContextRef obc
,
1568 const hobject_t
& soid
, pg_shard_t peer
,
1570 interval_set
<uint64_t> &data_subset
,
1571 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1573 bool cache_dont_need
,
1574 ObcLockManager
&&lock_manager
)
1576 get_parent()->begin_peer_recover(peer
, soid
);
1577 const auto pmissing_iter
= get_parent()->get_shard_missing().find(peer
);
1578 const auto missing_iter
= pmissing_iter
->second
.get_items().find(soid
);
1579 assert(missing_iter
!= pmissing_iter
->second
.get_items().end());
1581 PushInfo
&pi
= pushing
[soid
][peer
];
1583 pi
.recovery_info
.size
= obc
->obs
.oi
.size
;
1584 pi
.recovery_info
.copy_subset
= data_subset
;
1585 pi
.recovery_info
.clone_subset
= clone_subsets
;
1586 pi
.recovery_info
.soid
= soid
;
1587 pi
.recovery_info
.oi
= obc
->obs
.oi
;
1588 pi
.recovery_info
.ss
= pop
->recovery_info
.ss
;
1589 pi
.recovery_info
.version
= version
;
1590 pi
.recovery_info
.object_exist
= missing_iter
->second
.clean_regions
.object_is_exist();
1591 pi
.recovery_progress
.omap_complete
= !missing_iter
->second
.clean_regions
.omap_is_dirty();
1592 pi
.lock_manager
= std::move(lock_manager
);
1594 ObjectRecoveryProgress new_progress
;
1595 int r
= build_push_op(pi
.recovery_info
,
1596 pi
.recovery_progress
,
1599 &(pi
.stat
), cache_dont_need
);
1602 pi
.recovery_progress
= new_progress
;
1606 void ReplicatedBackend::submit_push_data(
1607 const ObjectRecoveryInfo
&recovery_info
,
1611 bool cache_dont_need
,
1612 interval_set
<uint64_t> &data_zeros
,
1613 const interval_set
<uint64_t> &intervals_included
,
1614 bufferlist data_included
,
1615 bufferlist omap_header
,
1616 const map
<string
, bufferlist
, less
<>> &attrs
,
1617 const map
<string
, bufferlist
> &omap_entries
,
1618 ObjectStore::Transaction
*t
)
1620 hobject_t target_oid
;
1621 if (first
&& complete
) {
1622 target_oid
= recovery_info
.soid
;
1624 target_oid
= get_parent()->get_temp_recovery_object(recovery_info
.soid
,
1625 recovery_info
.version
);
1627 dout(10) << __func__
<< ": Adding oid "
1628 << target_oid
<< " in the temp collection" << dendl
;
1629 add_temp_obj(target_oid
);
1635 t
->remove(coll
, ghobject_t(target_oid
));
1636 t
->touch(coll
, ghobject_t(target_oid
));
1637 object_info_t
oi(attrs
.at(OI_ATTR
));
1638 t
->set_alloc_hint(coll
, ghobject_t(target_oid
),
1639 oi
.expected_object_size
,
1640 oi
.expected_write_size
,
1641 oi
.alloc_hint_flags
);
1643 if (!recovery_info
.object_exist
) {
1644 t
->remove(coll
, ghobject_t(target_oid
));
1645 t
->touch(coll
, ghobject_t(target_oid
));
1646 object_info_t
oi(attrs
.at(OI_ATTR
));
1647 t
->set_alloc_hint(coll
, ghobject_t(target_oid
),
1648 oi
.expected_object_size
,
1649 oi
.expected_write_size
,
1650 oi
.alloc_hint_flags
);
1652 //remove xattr and update later if overwrite on original object
1653 t
->rmattrs(coll
, ghobject_t(target_oid
));
1654 //if need update omap, clear the previous content first
1656 t
->omap_clear(coll
, ghobject_t(target_oid
));
1659 t
->truncate(coll
, ghobject_t(target_oid
), recovery_info
.size
);
1660 if (omap_header
.length())
1661 t
->omap_setheader(coll
, ghobject_t(target_oid
), omap_header
);
1664 int r
= store
->stat(ch
, ghobject_t(recovery_info
.soid
), &st
);
1665 if (get_parent()->pg_is_remote_backfilling()) {
1669 // Don't need to do anything if object is still the same size
1670 if (size
!= recovery_info
.oi
.size
) {
1671 get_parent()->pg_add_local_num_bytes((int64_t)recovery_info
.oi
.size
- (int64_t)size
);
1672 get_parent()->pg_add_num_bytes((int64_t)recovery_info
.oi
.size
- (int64_t)size
);
1673 dout(10) << __func__
<< " " << recovery_info
.soid
1674 << " backfill size " << recovery_info
.oi
.size
1675 << " previous size " << size
1676 << " net size " << recovery_info
.oi
.size
- size
1681 //clone overlap content in local object
1682 if (recovery_info
.object_exist
) {
1684 uint64_t local_size
= std::min(recovery_info
.size
, (uint64_t)st
.st_size
);
1685 interval_set
<uint64_t> local_intervals_included
, local_intervals_excluded
;
1687 local_intervals_included
.insert(0, local_size
);
1688 local_intervals_excluded
.intersection_of(local_intervals_included
, recovery_info
.copy_subset
);
1689 local_intervals_included
.subtract(local_intervals_excluded
);
1691 for (interval_set
<uint64_t>::const_iterator q
= local_intervals_included
.begin();
1692 q
!= local_intervals_included
.end();
1694 dout(15) << " clone_range " << recovery_info
.soid
<< " "
1695 << q
.get_start() << "~" << q
.get_len() << dendl
;
1696 t
->clone_range(coll
, ghobject_t(recovery_info
.soid
), ghobject_t(target_oid
),
1697 q
.get_start(), q
.get_len(), q
.get_start());
1703 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
;
1704 if (cache_dont_need
)
1705 fadvise_flags
|= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
1706 // Punch zeros for data, if fiemap indicates nothing but it is marked dirty
1707 if (data_zeros
.size() > 0) {
1708 data_zeros
.intersection_of(recovery_info
.copy_subset
);
1709 assert(intervals_included
.subset_of(data_zeros
));
1710 data_zeros
.subtract(intervals_included
);
1712 dout(20) << __func__
<<" recovering object " << recovery_info
.soid
1713 << " copy_subset: " << recovery_info
.copy_subset
1714 << " intervals_included: " << intervals_included
1715 << " data_zeros: " << data_zeros
<< dendl
;
1717 for (auto p
= data_zeros
.begin(); p
!= data_zeros
.end(); ++p
)
1718 t
->zero(coll
, ghobject_t(target_oid
), p
.get_start(), p
.get_len());
1720 for (interval_set
<uint64_t>::const_iterator p
= intervals_included
.begin();
1721 p
!= intervals_included
.end();
1724 bit
.substr_of(data_included
, off
, p
.get_len());
1725 t
->write(coll
, ghobject_t(target_oid
),
1726 p
.get_start(), p
.get_len(), bit
, fadvise_flags
);
1730 if (!omap_entries
.empty())
1731 t
->omap_setkeys(coll
, ghobject_t(target_oid
), omap_entries
);
1733 t
->setattrs(coll
, ghobject_t(target_oid
), attrs
);
1737 dout(10) << __func__
<< ": Removing oid "
1738 << target_oid
<< " from the temp collection" << dendl
;
1739 clear_temp_obj(target_oid
);
1740 t
->remove(coll
, ghobject_t(recovery_info
.soid
));
1741 t
->collection_move_rename(coll
, ghobject_t(target_oid
),
1742 coll
, ghobject_t(recovery_info
.soid
));
1745 submit_push_complete(recovery_info
, t
);
1750 void ReplicatedBackend::submit_push_complete(
1751 const ObjectRecoveryInfo
&recovery_info
,
1752 ObjectStore::Transaction
*t
)
1754 for (map
<hobject_t
, interval_set
<uint64_t>>::const_iterator p
=
1755 recovery_info
.clone_subset
.begin();
1756 p
!= recovery_info
.clone_subset
.end();
1758 for (interval_set
<uint64_t>::const_iterator q
= p
->second
.begin();
1759 q
!= p
->second
.end();
1761 dout(15) << " clone_range " << p
->first
<< " "
1762 << q
.get_start() << "~" << q
.get_len() << dendl
;
1763 t
->clone_range(coll
, ghobject_t(p
->first
), ghobject_t(recovery_info
.soid
),
1764 q
.get_start(), q
.get_len(), q
.get_start());
1769 ObjectRecoveryInfo
ReplicatedBackend::recalc_subsets(
1770 const ObjectRecoveryInfo
& recovery_info
,
1771 SnapSetContext
*ssc
,
1772 ObcLockManager
&manager
)
1774 if (!recovery_info
.soid
.snap
|| recovery_info
.soid
.snap
>= CEPH_NOSNAP
)
1775 return recovery_info
;
1776 ObjectRecoveryInfo new_info
= recovery_info
;
1777 new_info
.copy_subset
.clear();
1778 new_info
.clone_subset
.clear();
1780 get_parent()->release_locks(manager
); // might already have locks
1782 ssc
->snapset
, new_info
.soid
, get_parent()->get_local_missing(),
1783 get_info().last_backfill
,
1784 new_info
.copy_subset
, new_info
.clone_subset
,
1789 bool ReplicatedBackend::handle_pull_response(
1790 pg_shard_t from
, const PushOp
&pop
, PullOp
*response
,
1791 list
<pull_complete_info
> *to_continue
,
1792 ObjectStore::Transaction
*t
)
1794 interval_set
<uint64_t> data_included
= pop
.data_included
;
1797 dout(10) << "handle_pull_response "
1798 << pop
.recovery_info
1799 << pop
.after_progress
1800 << " data.size() is " << data
.length()
1801 << " data_included: " << data_included
1803 if (pop
.version
== eversion_t()) {
1804 // replica doesn't have it!
1805 _failed_pull(from
, pop
.soid
);
1809 const hobject_t
&hoid
= pop
.soid
;
1810 ceph_assert((data_included
.empty() && data
.length() == 0) ||
1811 (!data_included
.empty() && data
.length() > 0));
1813 auto piter
= pulling
.find(hoid
);
1814 if (piter
== pulling
.end()) {
1818 PullInfo
&pi
= piter
->second
;
1819 if (pi
.recovery_info
.size
== (uint64_t(-1))) {
1820 pi
.recovery_info
.size
= pop
.recovery_info
.size
;
1821 pi
.recovery_info
.copy_subset
.intersection_of(
1822 pop
.recovery_info
.copy_subset
);
1824 // If primary doesn't have object info and didn't know version
1825 if (pi
.recovery_info
.version
== eversion_t()) {
1826 pi
.recovery_info
.version
= pop
.version
;
1829 bool first
= pi
.recovery_progress
.first
;
1831 // attrs only reference the origin bufferlist (decode from
1832 // MOSDPGPush message) whose size is much greater than attrs in
1833 // recovery. If obc cache it (get_obc maybe cache the attr), this
1834 // causes the whole origin bufferlist would not be free until obc
1835 // is evicted from obc cache. So rebuild the bufferlists before
1837 auto attrset
= pop
.attrset
;
1838 for (auto& a
: attrset
) {
1841 pi
.obc
= get_parent()->get_obc(pi
.recovery_info
.soid
, attrset
);
1842 if (attrset
.find(SS_ATTR
) != attrset
.end()) {
1843 bufferlist ssbv
= attrset
.at(SS_ATTR
);
1845 assert(!pi
.obc
->ssc
->exists
|| ss
.seq
== pi
.obc
->ssc
->snapset
.seq
);
1847 pi
.recovery_info
.oi
= pi
.obc
->obs
.oi
;
1848 pi
.recovery_info
= recalc_subsets(
1855 interval_set
<uint64_t> usable_intervals
;
1856 bufferlist usable_data
;
1857 trim_pushed_data(pi
.recovery_info
.copy_subset
,
1862 data_included
= usable_intervals
;
1863 data
= std::move(usable_data
);
1866 pi
.recovery_progress
= pop
.after_progress
;
1868 dout(10) << "new recovery_info " << pi
.recovery_info
1869 << ", new progress " << pi
.recovery_progress
1871 interval_set
<uint64_t> data_zeros
;
1872 uint64_t z_offset
= pop
.before_progress
.data_recovered_to
;
1873 uint64_t z_length
= pop
.after_progress
.data_recovered_to
- pop
.before_progress
.data_recovered_to
;
1875 data_zeros
.insert(z_offset
, z_length
);
1876 bool complete
= pi
.is_complete();
1877 bool clear_omap
= !pop
.before_progress
.omap_complete
;
1879 submit_push_data(pi
.recovery_info
,
1892 pi
.stat
.num_keys_recovered
+= pop
.omap_entries
.size();
1893 pi
.stat
.num_bytes_recovered
+= data
.length();
1894 get_parent()->get_logger()->inc(l_osd_rbytes
, pop
.omap_entries
.size() + data
.length());
1897 pi
.stat
.num_objects_recovered
++;
1898 // XXX: This could overcount if regular recovery is needed right after a repair
1899 if (get_parent()->pg_is_repair()) {
1900 pi
.stat
.num_objects_repaired
++;
1901 get_parent()->inc_osd_stat_repaired();
1903 clear_pull_from(piter
);
1904 to_continue
->push_back({hoid
, pi
.stat
});
1905 get_parent()->on_local_recover(
1906 hoid
, pi
.recovery_info
, pi
.obc
, false, t
);
1909 response
->soid
= pop
.soid
;
1910 response
->recovery_info
= pi
.recovery_info
;
1911 response
->recovery_progress
= pi
.recovery_progress
;
1916 void ReplicatedBackend::handle_push(
1917 pg_shard_t from
, const PushOp
&pop
, PushReplyOp
*response
,
1918 ObjectStore::Transaction
*t
, bool is_repair
)
1920 dout(10) << "handle_push "
1921 << pop
.recovery_info
1922 << pop
.after_progress
1926 bool first
= pop
.before_progress
.first
;
1927 bool complete
= pop
.after_progress
.data_complete
&&
1928 pop
.after_progress
.omap_complete
;
1929 bool clear_omap
= !pop
.before_progress
.omap_complete
;
1930 interval_set
<uint64_t> data_zeros
;
1931 uint64_t z_offset
= pop
.before_progress
.data_recovered_to
;
1932 uint64_t z_length
= pop
.after_progress
.data_recovered_to
- pop
.before_progress
.data_recovered_to
;
1934 data_zeros
.insert(z_offset
, z_length
);
1935 response
->soid
= pop
.recovery_info
.soid
;
1937 submit_push_data(pop
.recovery_info
,
1941 true, // must be replicate
1952 get_parent()->inc_osd_stat_repaired();
1953 dout(20) << __func__
<< " repair complete" << dendl
;
1955 get_parent()->on_local_recover(
1956 pop
.recovery_info
.soid
,
1958 ObjectContextRef(), // ok, is replica
1964 void ReplicatedBackend::send_pushes(int prio
, map
<pg_shard_t
, vector
<PushOp
> > &pushes
)
1966 for (map
<pg_shard_t
, vector
<PushOp
> >::iterator i
= pushes
.begin();
1969 ConnectionRef con
= get_parent()->get_con_osd_cluster(
1971 get_osdmap_epoch());
1974 vector
<PushOp
>::iterator j
= i
->second
.begin();
1975 while (j
!= i
->second
.end()) {
1977 uint64_t pushes
= 0;
1978 MOSDPGPush
*msg
= new MOSDPGPush();
1979 msg
->from
= get_parent()->whoami_shard();
1980 msg
->pgid
= get_parent()->primary_spg_t();
1981 msg
->map_epoch
= get_osdmap_epoch();
1982 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
1983 msg
->set_priority(prio
);
1984 msg
->is_repair
= get_parent()->pg_is_repair();
1986 (j
!= i
->second
.end() &&
1987 cost
< cct
->_conf
->osd_max_push_cost
&&
1988 pushes
< cct
->_conf
->osd_max_push_objects
) ;
1990 dout(20) << __func__
<< ": sending push " << *j
1991 << " to osd." << i
->first
<< dendl
;
1992 cost
+= j
->cost(cct
);
1994 msg
->pushes
.push_back(*j
);
1996 msg
->set_cost(cost
);
1997 get_parent()->send_message_osd_cluster(msg
, con
);
2002 void ReplicatedBackend::send_pulls(int prio
, map
<pg_shard_t
, vector
<PullOp
> > &pulls
)
2004 for (map
<pg_shard_t
, vector
<PullOp
> >::iterator i
= pulls
.begin();
2007 ConnectionRef con
= get_parent()->get_con_osd_cluster(
2009 get_osdmap_epoch());
2012 dout(20) << __func__
<< ": sending pulls " << i
->second
2013 << " to osd." << i
->first
<< dendl
;
2014 MOSDPGPull
*msg
= new MOSDPGPull();
2015 msg
->from
= parent
->whoami_shard();
2016 msg
->set_priority(prio
);
2017 msg
->pgid
= get_parent()->primary_spg_t();
2018 msg
->map_epoch
= get_osdmap_epoch();
2019 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
2020 msg
->set_pulls(std::move(i
->second
));
2021 msg
->compute_cost(cct
);
2022 get_parent()->send_message_osd_cluster(msg
, con
);
2026 int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo
&recovery_info
,
2027 const ObjectRecoveryProgress
&progress
,
2028 ObjectRecoveryProgress
*out_progress
,
2030 object_stat_sum_t
*stat
,
2031 bool cache_dont_need
)
2033 ObjectRecoveryProgress _new_progress
;
2035 out_progress
= &_new_progress
;
2036 ObjectRecoveryProgress
&new_progress
= *out_progress
;
2037 new_progress
= progress
;
2039 dout(7) << __func__
<< " " << recovery_info
.soid
2040 << " v " << recovery_info
.version
2041 << " size " << recovery_info
.size
2042 << " recovery_info: " << recovery_info
2045 eversion_t v
= recovery_info
.version
;
2047 if (progress
.first
) {
2048 int r
= store
->omap_get_header(ch
, ghobject_t(recovery_info
.soid
), &out_op
->omap_header
);
2050 dout(1) << __func__
<< " get omap header failed: " << cpp_strerror(-r
) << dendl
;
2053 r
= store
->getattrs(ch
, ghobject_t(recovery_info
.soid
), out_op
->attrset
);
2055 dout(1) << __func__
<< " getattrs failed: " << cpp_strerror(-r
) << dendl
;
2061 oi
.decode(out_op
->attrset
[OI_ATTR
]);
2063 dout(0) << __func__
<< ": bad object_info_t: " << recovery_info
.soid
<< dendl
;
2067 // If requestor didn't know the version, use ours
2068 if (v
== eversion_t()) {
2070 } else if (oi
.version
!= v
) {
2071 get_parent()->clog_error() << get_info().pgid
<< " push "
2072 << recovery_info
.soid
<< " v "
2073 << recovery_info
.version
2074 << " failed because local copy is "
2079 new_progress
.first
= false;
2081 // Once we provide the version subsequent requests will have it, so
2082 // at this point it must be known.
2083 ceph_assert(v
!= eversion_t());
2085 uint64_t available
= cct
->_conf
->osd_recovery_max_chunk
;
2086 if (!progress
.omap_complete
) {
2087 ObjectMap::ObjectMapIterator iter
=
2088 store
->get_omap_iterator(ch
,
2089 ghobject_t(recovery_info
.soid
));
2091 for (iter
->lower_bound(progress
.omap_recovered_to
);
2094 if (!out_op
->omap_entries
.empty() &&
2095 ((cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
> 0 &&
2096 out_op
->omap_entries
.size() >= cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
) ||
2097 available
<= iter
->key().size() + iter
->value().length()))
2099 out_op
->omap_entries
.insert(make_pair(iter
->key(), iter
->value()));
2101 if ((iter
->key().size() + iter
->value().length()) <= available
)
2102 available
-= (iter
->key().size() + iter
->value().length());
2107 new_progress
.omap_complete
= true;
2109 new_progress
.omap_recovered_to
= iter
->key();
2112 if (available
> 0) {
2113 if (!recovery_info
.copy_subset
.empty()) {
2114 interval_set
<uint64_t> copy_subset
= recovery_info
.copy_subset
;
2115 map
<uint64_t, uint64_t> m
;
2116 int r
= store
->fiemap(ch
, ghobject_t(recovery_info
.soid
), 0,
2117 copy_subset
.range_end(), m
);
2119 interval_set
<uint64_t> fiemap_included(std::move(m
));
2120 copy_subset
.intersection_of(fiemap_included
);
2122 // intersection of copy_subset and empty interval_set would be empty anyway
2123 copy_subset
.clear();
2126 out_op
->data_included
.span_of(copy_subset
, progress
.data_recovered_to
,
2128 // zero filled section, skip to end!
2129 if (out_op
->data_included
.empty() ||
2130 out_op
->data_included
.range_end() == copy_subset
.range_end())
2131 new_progress
.data_recovered_to
= recovery_info
.copy_subset
.range_end();
2133 new_progress
.data_recovered_to
= out_op
->data_included
.range_end();
2136 out_op
->data_included
.clear();
2139 auto origin_size
= out_op
->data_included
.size();
2141 int r
= store
->readv(ch
, ghobject_t(recovery_info
.soid
),
2142 out_op
->data_included
, bit
,
2143 cache_dont_need
? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
: 0);
2144 if (cct
->_conf
->osd_debug_random_push_read_error
&&
2145 (rand() % (int)(cct
->_conf
->osd_debug_random_push_read_error
* 100.0)) == 0) {
2146 dout(0) << __func__
<< ": inject EIO " << recovery_info
.soid
<< dendl
;
2152 if (out_op
->data_included
.size() != origin_size
) {
2153 dout(10) << __func__
<< " some extents get pruned "
2154 << out_op
->data_included
.size() << "/" << origin_size
2156 new_progress
.data_complete
= true;
2158 out_op
->data
.claim_append(bit
);
2159 if (progress
.first
&& !out_op
->data_included
.empty() &&
2160 out_op
->data_included
.begin().get_start() == 0 &&
2161 out_op
->data
.length() == oi
.size
&& oi
.is_data_digest()) {
2162 uint32_t crc
= out_op
->data
.crc32c(-1);
2163 if (oi
.data_digest
!= crc
) {
2164 dout(0) << __func__
<< " " << coll
<< std::hex
2165 << " full-object read crc 0x" << crc
2166 << " != expected 0x" << oi
.data_digest
2167 << std::dec
<< " on " << recovery_info
.soid
<< dendl
;
2172 if (new_progress
.is_complete(recovery_info
)) {
2173 new_progress
.data_complete
= true;
2175 stat
->num_objects_recovered
++;
2176 if (get_parent()->pg_is_repair())
2177 stat
->num_objects_repaired
++;
2179 } else if (progress
.first
&& progress
.omap_complete
) {
2180 // If omap is not changed, we need recovery omap when recovery cannot be completed once
2181 new_progress
.omap_complete
= false;
2185 stat
->num_keys_recovered
+= out_op
->omap_entries
.size();
2186 stat
->num_bytes_recovered
+= out_op
->data
.length();
2187 get_parent()->get_logger()->inc(l_osd_rbytes
, out_op
->omap_entries
.size() + out_op
->data
.length());
2190 get_parent()->get_logger()->inc(l_osd_push
);
2191 get_parent()->get_logger()->inc(l_osd_push_outb
, out_op
->data
.length());
2194 out_op
->version
= v
;
2195 out_op
->soid
= recovery_info
.soid
;
2196 out_op
->recovery_info
= recovery_info
;
2197 out_op
->after_progress
= new_progress
;
2198 out_op
->before_progress
= progress
;
2202 void ReplicatedBackend::prep_push_op_blank(const hobject_t
& soid
, PushOp
*op
)
2204 op
->recovery_info
.version
= eversion_t();
2205 op
->version
= eversion_t();
2209 bool ReplicatedBackend::handle_push_reply(
2210 pg_shard_t peer
, const PushReplyOp
&op
, PushOp
*reply
)
2212 const hobject_t
&soid
= op
.soid
;
2213 if (pushing
.count(soid
) == 0) {
2214 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2215 << ", or anybody else"
2218 } else if (pushing
[soid
].count(peer
) == 0) {
2219 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2223 PushInfo
*pi
= &pushing
[soid
][peer
];
2224 bool error
= pushing
[soid
].begin()->second
.recovery_progress
.error
;
2226 if (!pi
->recovery_progress
.data_complete
&& !error
) {
2227 dout(10) << " pushing more from, "
2228 << pi
->recovery_progress
.data_recovered_to
2229 << " of " << pi
->recovery_info
.copy_subset
<< dendl
;
2230 ObjectRecoveryProgress new_progress
;
2231 int r
= build_push_op(
2233 pi
->recovery_progress
, &new_progress
, reply
,
2235 // Handle the case of a read error right after we wrote, which is
2236 // hopefully extremely rare.
2238 dout(5) << __func__
<< ": oid " << soid
<< " error " << r
<< dendl
;
2243 pi
->recovery_progress
= new_progress
;
2249 get_parent()->on_peer_recover( peer
, soid
, pi
->recovery_info
);
2251 get_parent()->release_locks(pi
->lock_manager
);
2252 object_stat_sum_t stat
= pi
->stat
;
2253 eversion_t v
= pi
->recovery_info
.version
;
2254 pushing
[soid
].erase(peer
);
2257 if (pushing
[soid
].empty()) {
2259 get_parent()->on_global_recover(soid
, stat
, false);
2261 get_parent()->on_failed_pull(
2262 std::set
<pg_shard_t
>{ get_parent()->whoami_shard() },
2265 pushing
.erase(soid
);
2267 // This looks weird, but we erased the current peer and need to remember
2268 // the error on any other one, while getting more acks.
2270 pushing
[soid
].begin()->second
.recovery_progress
.error
= true;
2271 dout(10) << "pushed " << soid
<< ", still waiting for push ack from "
2272 << pushing
[soid
].size() << " others" << dendl
;
2279 void ReplicatedBackend::handle_pull(pg_shard_t peer
, PullOp
&op
, PushOp
*reply
)
2281 const hobject_t
&soid
= op
.soid
;
2283 int r
= store
->stat(ch
, ghobject_t(soid
), &st
);
2285 get_parent()->clog_error() << get_info().pgid
<< " "
2286 << peer
<< " tried to pull " << soid
2287 << " but got " << cpp_strerror(-r
);
2288 prep_push_op_blank(soid
, reply
);
2290 ObjectRecoveryInfo
&recovery_info
= op
.recovery_info
;
2291 ObjectRecoveryProgress
&progress
= op
.recovery_progress
;
2292 if (progress
.first
&& recovery_info
.size
== ((uint64_t)-1)) {
2293 // Adjust size and copy_subset
2294 recovery_info
.size
= st
.st_size
;
2296 interval_set
<uint64_t> object_range
;
2297 object_range
.insert(0, st
.st_size
);
2298 recovery_info
.copy_subset
.intersection_of(object_range
);
2300 recovery_info
.copy_subset
.clear();
2302 assert(recovery_info
.clone_subset
.empty());
2305 r
= build_push_op(recovery_info
, progress
, 0, reply
);
2307 prep_push_op_blank(soid
, reply
);
2312 * trim received data to remove what we don't want
2314 * @param copy_subset intervals we want
2315 * @param data_included intervals we got
2316 * @param data_recieved data we got
2317 * @param intervals_usable intervals we want to keep
2318 * @param data_usable matching data we want to keep
2320 void ReplicatedBackend::trim_pushed_data(
2321 const interval_set
<uint64_t> ©_subset
,
2322 const interval_set
<uint64_t> &intervals_received
,
2323 bufferlist data_received
,
2324 interval_set
<uint64_t> *intervals_usable
,
2325 bufferlist
*data_usable
)
2327 if (intervals_received
.subset_of(copy_subset
)) {
2328 *intervals_usable
= intervals_received
;
2329 *data_usable
= data_received
;
2333 intervals_usable
->intersection_of(copy_subset
,
2334 intervals_received
);
2337 for (interval_set
<uint64_t>::const_iterator p
= intervals_received
.begin();
2338 p
!= intervals_received
.end();
2340 interval_set
<uint64_t> x
;
2341 x
.insert(p
.get_start(), p
.get_len());
2342 x
.intersection_of(copy_subset
);
2343 for (interval_set
<uint64_t>::const_iterator q
= x
.begin();
2347 uint64_t data_off
= off
+ (q
.get_start() - p
.get_start());
2348 sub
.substr_of(data_received
, data_off
, q
.get_len());
2349 data_usable
->claim_append(sub
);
2355 void ReplicatedBackend::_failed_pull(pg_shard_t from
, const hobject_t
&soid
)
2357 dout(20) << __func__
<< ": " << soid
<< " from " << from
<< dendl
;
2358 auto it
= pulling
.find(soid
);
2359 assert(it
!= pulling
.end());
2360 get_parent()->on_failed_pull(
2363 it
->second
.recovery_info
.version
);
2368 void ReplicatedBackend::clear_pull_from(
2369 map
<hobject_t
, PullInfo
>::iterator piter
)
2371 auto from
= piter
->second
.from
;
2372 pull_from_peer
[from
].erase(piter
->second
.soid
);
2373 if (pull_from_peer
[from
].empty())
2374 pull_from_peer
.erase(from
);
2377 void ReplicatedBackend::clear_pull(
2378 map
<hobject_t
, PullInfo
>::iterator piter
,
2379 bool clear_pull_from_peer
)
2381 if (clear_pull_from_peer
) {
2382 clear_pull_from(piter
);
2384 get_parent()->release_locks(piter
->second
.lock_manager
);
2385 pulling
.erase(piter
);
2388 int ReplicatedBackend::start_pushes(
2389 const hobject_t
&soid
,
2390 ObjectContextRef obc
,
2393 list
< map
<pg_shard_t
, pg_missing_t
>::const_iterator
> shards
;
2395 dout(20) << __func__
<< " soid " << soid
<< dendl
;
2397 ceph_assert(get_parent()->get_acting_recovery_backfill_shards().size() > 0);
2398 for (set
<pg_shard_t
>::iterator i
=
2399 get_parent()->get_acting_recovery_backfill_shards().begin();
2400 i
!= get_parent()->get_acting_recovery_backfill_shards().end();
2402 if (*i
== get_parent()->whoami_shard()) continue;
2403 pg_shard_t peer
= *i
;
2404 map
<pg_shard_t
, pg_missing_t
>::const_iterator j
=
2405 get_parent()->get_shard_missing().find(peer
);
2406 ceph_assert(j
!= get_parent()->get_shard_missing().end());
2407 if (j
->second
.is_missing(soid
)) {
2408 shards
.push_back(j
);
2412 // If more than 1 read will occur ignore possible request to not cache
2413 bool cache
= shards
.size() == 1 ? h
->cache_dont_need
: false;
2415 for (auto j
: shards
) {
2416 pg_shard_t peer
= j
->first
;
2417 h
->pushes
[peer
].push_back(PushOp());
2418 int r
= prep_push_to_replica(obc
, soid
, peer
,
2419 &(h
->pushes
[peer
].back()), cache
);
2421 // Back out all failed reads
2422 for (auto k
: shards
) {
2423 pg_shard_t p
= k
->first
;
2424 dout(10) << __func__
<< " clean up peer " << p
<< dendl
;
2425 h
->pushes
[p
].pop_back();
2426 if (p
== peer
) break;
2431 return shards
.size();