1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "common/errno.h"
15 #include "ReplicatedBackend.h"
16 #include "messages/MOSDOp.h"
17 #include "messages/MOSDRepOp.h"
18 #include "messages/MOSDRepOpReply.h"
19 #include "messages/MOSDPGPush.h"
20 #include "messages/MOSDPGPull.h"
21 #include "messages/MOSDPGPushReply.h"
22 #include "common/EventTrace.h"
23 #include "include/random.h"
24 #include "include/util.h"
27 #define dout_context cct
28 #define dout_subsys ceph_subsys_osd
29 #define DOUT_PREFIX_ARGS this
31 #define dout_prefix _prefix(_dout, this)
32 static ostream
& _prefix(std::ostream
*_dout
, ReplicatedBackend
*pgb
) {
33 return pgb
->get_parent()->gen_dbg_prefix(*_dout
);
37 class PG_SendMessageOnConn
: public Context
{
38 PGBackend::Listener
*pg
;
43 PGBackend::Listener
*pg
,
45 ConnectionRef conn
) : pg(pg
), reply(reply
), conn(conn
) {}
46 void finish(int) override
{
47 pg
->send_message_osd_cluster(reply
, conn
.get());
51 class PG_RecoveryQueueAsync
: public Context
{
52 PGBackend::Listener
*pg
;
53 unique_ptr
<GenContext
<ThreadPool::TPHandle
&>> c
;
55 PG_RecoveryQueueAsync(
56 PGBackend::Listener
*pg
,
57 GenContext
<ThreadPool::TPHandle
&> *c
) : pg(pg
), c(c
) {}
58 void finish(int) override
{
59 pg
->schedule_recovery_work(c
.release());
64 struct ReplicatedBackend::C_OSD_RepModifyCommit
: public Context
{
65 ReplicatedBackend
*pg
;
67 C_OSD_RepModifyCommit(ReplicatedBackend
*pg
, RepModifyRef r
)
69 void finish(int r
) override
{
74 static void log_subop_stats(
76 OpRequestRef op
, int subop
)
78 utime_t now
= ceph_clock_now();
79 utime_t latency
= now
;
80 latency
-= op
->get_req()->get_recv_stamp();
83 logger
->inc(l_osd_sop
);
84 logger
->tinc(l_osd_sop_lat
, latency
);
87 if (subop
!= l_osd_sop_pull
) {
88 uint64_t inb
= op
->get_req()->get_data().length();
89 logger
->inc(l_osd_sop_inb
, inb
);
90 if (subop
== l_osd_sop_w
) {
91 logger
->inc(l_osd_sop_w_inb
, inb
);
92 logger
->tinc(l_osd_sop_w_lat
, latency
);
93 } else if (subop
== l_osd_sop_push
) {
94 logger
->inc(l_osd_sop_push_inb
, inb
);
95 logger
->tinc(l_osd_sop_push_lat
, latency
);
97 ceph_abort_msg("no support subop");
99 logger
->tinc(l_osd_sop_pull_lat
, latency
);
103 ReplicatedBackend::ReplicatedBackend(
104 PGBackend::Listener
*pg
,
106 ObjectStore::CollectionHandle
&c
,
109 PGBackend(cct
, pg
, store
, coll
, c
) {}
111 void ReplicatedBackend::run_recovery_op(
112 PGBackend::RecoveryHandle
*_h
,
115 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
116 send_pushes(priority
, h
->pushes
);
117 send_pulls(priority
, h
->pulls
);
118 send_recovery_deletes(priority
, h
->deletes
);
122 int ReplicatedBackend::recover_object(
123 const hobject_t
&hoid
,
125 ObjectContextRef head
,
126 ObjectContextRef obc
,
130 dout(10) << __func__
<< ": " << hoid
<< dendl
;
131 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
132 if (get_parent()->get_local_missing().is_missing(hoid
)) {
142 int started
= start_pushes(
147 pushing
[hoid
].clear();
154 void ReplicatedBackend::check_recovery_sources(const OSDMapRef
& osdmap
)
156 for(map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= pull_from_peer
.begin();
157 i
!= pull_from_peer
.end();
159 if (osdmap
->is_down(i
->first
.osd
)) {
160 dout(10) << "check_recovery_sources resetting pulls from osd." << i
->first
161 << ", osdmap has it marked down" << dendl
;
162 for (set
<hobject_t
>::iterator j
= i
->second
.begin();
163 j
!= i
->second
.end();
165 get_parent()->cancel_pull(*j
);
166 clear_pull(pulling
.find(*j
), false);
168 pull_from_peer
.erase(i
++);
175 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op
)
177 dout(10) << __func__
<< ": " << op
<< dendl
;
178 switch (op
->get_req()->get_type()) {
179 case MSG_OSD_PG_PULL
:
186 bool ReplicatedBackend::_handle_message(
190 dout(10) << __func__
<< ": " << op
<< dendl
;
191 switch (op
->get_req()->get_type()) {
192 case MSG_OSD_PG_PUSH
:
196 case MSG_OSD_PG_PULL
:
200 case MSG_OSD_PG_PUSH_REPLY
:
204 case MSG_OSD_REPOP
: {
209 case MSG_OSD_REPOPREPLY
: {
220 void ReplicatedBackend::clear_recovery_state()
222 // clear pushing/pulling maps
223 for (auto &&i
: pushing
) {
224 for (auto &&j
: i
.second
) {
225 get_parent()->release_locks(j
.second
.lock_manager
);
230 for (auto &&i
: pulling
) {
231 get_parent()->release_locks(i
.second
.lock_manager
);
234 pull_from_peer
.clear();
237 void ReplicatedBackend::on_change()
239 dout(10) << __func__
<< dendl
;
240 for (auto& op
: in_progress_ops
) {
241 delete op
.second
->on_commit
;
242 op
.second
->on_commit
= nullptr;
244 in_progress_ops
.clear();
245 clear_recovery_state();
248 int ReplicatedBackend::objects_read_sync(
249 const hobject_t
&hoid
,
255 return store
->read(ch
, ghobject_t(hoid
), off
, len
, *bl
, op_flags
);
258 void ReplicatedBackend::objects_read_async(
259 const hobject_t
&hoid
,
260 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
261 pair
<bufferlist
*, Context
*> > > &to_read
,
262 Context
*on_complete
,
265 ceph_abort_msg("async read is not used by replica pool");
268 class C_OSD_OnOpCommit
: public Context
{
269 ReplicatedBackend
*pg
;
270 ReplicatedBackend::InProgressOpRef op
;
272 C_OSD_OnOpCommit(ReplicatedBackend
*pg
, ReplicatedBackend::InProgressOp
*op
)
274 void finish(int) override
{
279 void generate_transaction(
280 PGTransactionUPtr
&pgt
,
282 vector
<pg_log_entry_t
> &log_entries
,
283 ObjectStore::Transaction
*t
,
284 set
<hobject_t
> *added
,
285 set
<hobject_t
> *removed
)
289 ceph_assert(removed
);
291 for (auto &&le
: log_entries
) {
292 le
.mark_unrollbackable();
293 auto oiter
= pgt
->op_map
.find(le
.soid
);
294 if (oiter
!= pgt
->op_map
.end() && oiter
->second
.updated_snaps
) {
295 bufferlist
bl(oiter
->second
.updated_snaps
->second
.size() * 8 + 8);
296 encode(oiter
->second
.updated_snaps
->second
, bl
);
298 le
.snaps
.reassign_to_mempool(mempool::mempool_osd_pglog
);
302 pgt
->safe_create_traverse(
303 [&](pair
<const hobject_t
, PGTransaction::ObjectOperation
> &obj_op
) {
304 const hobject_t
&oid
= obj_op
.first
;
305 const ghobject_t goid
=
306 ghobject_t(oid
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
);
307 const PGTransaction::ObjectOperation
&op
= obj_op
.second
;
310 if (op
.is_fresh_object()) {
312 } else if (op
.is_delete()) {
313 removed
->insert(oid
);
317 if (op
.delete_first
) {
318 t
->remove(coll
, goid
);
323 [&](const PGTransaction::ObjectOperation::Init::None
&) {
325 [&](const PGTransaction::ObjectOperation::Init::Create
&op
) {
326 t
->touch(coll
, goid
);
328 [&](const PGTransaction::ObjectOperation::Init::Clone
&op
) {
332 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
335 [&](const PGTransaction::ObjectOperation::Init::Rename
&op
) {
336 ceph_assert(op
.source
.is_temp());
337 t
->collection_move_rename(
340 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
346 t
->truncate(coll
, goid
, op
.truncate
->first
);
347 if (op
.truncate
->first
!= op
.truncate
->second
)
348 t
->truncate(coll
, goid
, op
.truncate
->second
);
351 if (!op
.attr_updates
.empty()) {
352 map
<string
, bufferlist
> attrs
;
353 for (auto &&p
: op
.attr_updates
) {
355 attrs
[p
.first
] = *(p
.second
);
357 t
->rmattr(coll
, goid
, p
.first
);
359 t
->setattrs(coll
, goid
, attrs
);
363 t
->omap_clear(coll
, goid
);
365 t
->omap_setheader(coll
, goid
, *(op
.omap_header
));
367 for (auto &&up
: op
.omap_updates
) {
368 using UpdateType
= PGTransaction::ObjectOperation::OmapUpdateType
;
370 case UpdateType::Remove
:
371 t
->omap_rmkeys(coll
, goid
, up
.second
);
373 case UpdateType::Insert
:
374 t
->omap_setkeys(coll
, goid
, up
.second
);
379 // updated_snaps doesn't matter since we marked unrollbackable
382 auto &hint
= *(op
.alloc_hint
);
386 hint
.expected_object_size
,
387 hint
.expected_write_size
,
391 for (auto &&extent
: op
.buffer_updates
) {
392 using BufferUpdate
= PGTransaction::ObjectOperation::BufferUpdate
;
395 [&](const BufferUpdate::Write
&op
) {
403 [&](const BufferUpdate::Zero
&op
) {
410 [&](const BufferUpdate::CloneRange
&op
) {
411 ceph_assert(op
.len
== extent
.get_len());
414 ghobject_t(op
.from
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
424 void ReplicatedBackend::submit_transaction(
425 const hobject_t
&soid
,
426 const object_stat_sum_t
&delta_stats
,
427 const eversion_t
&at_version
,
428 PGTransactionUPtr
&&_t
,
429 const eversion_t
&trim_to
,
430 const eversion_t
&roll_forward_to
,
431 const vector
<pg_log_entry_t
> &_log_entries
,
432 boost::optional
<pg_hit_set_history_t
> &hset_history
,
433 Context
*on_all_commit
,
436 OpRequestRef orig_op
)
442 vector
<pg_log_entry_t
> log_entries(_log_entries
);
443 ObjectStore::Transaction op_t
;
444 PGTransactionUPtr
t(std::move(_t
));
445 set
<hobject_t
> added
, removed
;
446 generate_transaction(
453 ceph_assert(added
.size() <= 1);
454 ceph_assert(removed
.size() <= 1);
456 auto insert_res
= in_progress_ops
.insert(
464 ceph_assert(insert_res
.second
);
465 InProgressOp
&op
= *insert_res
.first
->second
;
467 op
.waiting_for_commit
.insert(
468 parent
->get_acting_recovery_backfill_shards().begin(),
469 parent
->get_acting_recovery_backfill_shards().end());
478 added
.size() ? *(added
.begin()) : hobject_t(),
479 removed
.size() ? *(removed
.begin()) : hobject_t(),
485 add_temp_objs(added
);
486 clear_temp_objs(removed
);
488 parent
->log_operation(
496 op_t
.register_on_commit(
497 parent
->bless_context(
498 new C_OSD_OnOpCommit(this, &op
)));
500 vector
<ObjectStore::Transaction
> tls
;
501 tls
.push_back(std::move(op_t
));
503 parent
->queue_transactions(tls
, op
.op
);
504 if (at_version
!= eversion_t()) {
505 parent
->op_applied(at_version
);
509 void ReplicatedBackend::op_commit(
512 if (op
->on_commit
== nullptr) {
518 OID_EVENT_TRACE_WITH_MSG((op
&& op
->op
) ? op
->op
->get_req() : NULL
, "OP_COMMIT_BEGIN", true);
519 dout(10) << __func__
<< ": " << op
->tid
<< dendl
;
521 op
->op
->mark_event("op_commit");
522 op
->op
->pg_trace
.event("op commit");
525 op
->waiting_for_commit
.erase(get_parent()->whoami_shard());
527 if (op
->waiting_for_commit
.empty()) {
528 op
->on_commit
->complete(0);
530 in_progress_ops
.erase(op
->tid
);
534 void ReplicatedBackend::do_repop_reply(OpRequestRef op
)
536 static_cast<MOSDRepOpReply
*>(op
->get_nonconst_req())->finish_decode();
537 const MOSDRepOpReply
*r
= static_cast<const MOSDRepOpReply
*>(op
->get_req());
538 ceph_assert(r
->get_header().type
== MSG_OSD_REPOPREPLY
);
542 // must be replication.
543 ceph_tid_t rep_tid
= r
->get_tid();
544 pg_shard_t from
= r
->from
;
546 auto iter
= in_progress_ops
.find(rep_tid
);
547 if (iter
!= in_progress_ops
.end()) {
548 InProgressOp
&ip_op
= *iter
->second
;
549 const MOSDOp
*m
= NULL
;
551 m
= static_cast<const MOSDOp
*>(ip_op
.op
->get_req());
554 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " op " //<< *m
555 << " ack_type " << (int)r
->ack_type
559 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " (no op) "
560 << " ack_type " << (int)r
->ack_type
566 if (r
->ack_type
& CEPH_OSD_FLAG_ONDISK
) {
567 ceph_assert(ip_op
.waiting_for_commit
.count(from
));
568 ip_op
.waiting_for_commit
.erase(from
);
570 ip_op
.op
->mark_event("sub_op_commit_rec");
571 ip_op
.op
->pg_trace
.event("sub_op_commit_rec");
574 // legacy peer; ignore
577 parent
->update_peer_last_complete_ondisk(
579 r
->get_last_complete_ondisk());
581 if (ip_op
.waiting_for_commit
.empty() &&
583 ip_op
.on_commit
->complete(0);
585 in_progress_ops
.erase(iter
);
590 int ReplicatedBackend::be_deep_scrub(
591 const hobject_t
&poid
,
593 ScrubMapBuilder
&pos
,
596 dout(10) << __func__
<< " " << poid
<< " pos " << pos
<< dendl
;
598 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
|
599 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
|
600 CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE
;
603 sleeptime
.set_from_double(cct
->_conf
->osd_debug_deep_scrub_sleep
);
604 if (sleeptime
!= utime_t()) {
605 lgeneric_derr(cct
) << __func__
<< " sleeping for " << sleeptime
<< dendl
;
609 ceph_assert(poid
== pos
.ls
[pos
.pos
]);
610 if (!pos
.data_done()) {
611 if (pos
.data_pos
== 0) {
612 pos
.data_hash
= bufferhash(-1);
619 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
621 cct
->_conf
->osd_deep_scrub_stride
, bl
,
624 dout(20) << __func__
<< " " << poid
<< " got "
625 << r
<< " on read, read_error" << dendl
;
633 if (r
== cct
->_conf
->osd_deep_scrub_stride
) {
634 dout(20) << __func__
<< " " << poid
<< " more data, digest so far 0x"
635 << std::hex
<< pos
.data_hash
.digest() << std::dec
<< dendl
;
640 o
.digest
= pos
.data_hash
.digest();
641 o
.digest_present
= true;
642 dout(20) << __func__
<< " " << poid
<< " done with data, digest 0x"
643 << std::hex
<< o
.digest
<< std::dec
<< dendl
;
647 if (pos
.omap_pos
.empty()) {
648 pos
.omap_hash
= bufferhash(-1);
651 r
= store
->omap_get_header(
654 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
657 dout(20) << __func__
<< " " << poid
<< " got "
658 << r
<< " on omap header read, read_error" << dendl
;
662 if (r
== 0 && hdrbl
.length()) {
663 bool encoded
= false;
664 dout(25) << "CRC header " << cleanbin(hdrbl
, encoded
, true) << dendl
;
665 pos
.omap_hash
<< hdrbl
;
670 ObjectMap::ObjectMapIterator iter
= store
->get_omap_iterator(
673 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
675 if (pos
.omap_pos
.length()) {
676 iter
->lower_bound(pos
.omap_pos
);
678 iter
->seek_to_first();
680 int max
= g_conf()->osd_deep_scrub_keys
;
681 while (iter
->status() == 0 && iter
->valid()) {
682 pos
.omap_bytes
+= iter
->value().length();
685 // fixme: we can do this more efficiently.
687 encode(iter
->key(), bl
);
688 encode(iter
->value(), bl
);
693 if (iter
->valid() && max
== 0) {
694 pos
.omap_pos
= iter
->key();
697 if (iter
->status() < 0) {
698 dout(25) << __func__
<< " " << poid
699 << " on omap scan, db status error" << dendl
;
705 if (pos
.omap_keys
> cct
->_conf
->
706 osd_deep_scrub_large_omap_object_key_threshold
||
707 pos
.omap_bytes
> cct
->_conf
->
708 osd_deep_scrub_large_omap_object_value_sum_threshold
) {
709 dout(25) << __func__
<< " " << poid
710 << " large omap object detected. Object has " << pos
.omap_keys
711 << " keys and size " << pos
.omap_bytes
<< " bytes" << dendl
;
712 o
.large_omap_object_found
= true;
713 o
.large_omap_object_key_count
= pos
.omap_keys
;
714 o
.large_omap_object_value_size
= pos
.omap_bytes
;
715 map
.has_large_omap_object_errors
= true;
718 o
.omap_digest
= pos
.omap_hash
.digest();
719 o
.omap_digest_present
= true;
720 dout(20) << __func__
<< " done with " << poid
<< " omap_digest "
721 << std::hex
<< o
.omap_digest
<< std::dec
<< dendl
;
724 if (pos
.omap_keys
> 0 || pos
.omap_bytes
> 0) {
725 dout(25) << __func__
<< " adding " << pos
.omap_keys
<< " keys and "
726 << pos
.omap_bytes
<< " bytes to pg_stats sums" << dendl
;
727 map
.has_omap_keys
= true;
728 o
.object_omap_bytes
= pos
.omap_bytes
;
729 o
.object_omap_keys
= pos
.omap_keys
;
736 void ReplicatedBackend::_do_push(OpRequestRef op
)
738 const MOSDPGPush
*m
= static_cast<const MOSDPGPush
*>(op
->get_req());
739 ceph_assert(m
->get_type() == MSG_OSD_PG_PUSH
);
740 pg_shard_t from
= m
->from
;
744 vector
<PushReplyOp
> replies
;
745 ObjectStore::Transaction t
;
746 if (get_parent()->check_failsafe_full()) {
747 dout(10) << __func__
<< " Out of space (failsafe) processing push request." << dendl
;
750 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
751 i
!= m
->pushes
.end();
753 replies
.push_back(PushReplyOp());
754 handle_push(from
, *i
, &(replies
.back()), &t
, m
->is_repair
);
757 MOSDPGPushReply
*reply
= new MOSDPGPushReply
;
758 reply
->from
= get_parent()->whoami_shard();
759 reply
->set_priority(m
->get_priority());
760 reply
->pgid
= get_info().pgid
;
761 reply
->map_epoch
= m
->map_epoch
;
762 reply
->min_epoch
= m
->min_epoch
;
763 reply
->replies
.swap(replies
);
764 reply
->compute_cost(cct
);
766 t
.register_on_complete(
767 new PG_SendMessageOnConn(
768 get_parent(), reply
, m
->get_connection()));
770 get_parent()->queue_transaction(std::move(t
));
773 struct C_ReplicatedBackend_OnPullComplete
: GenContext
<ThreadPool::TPHandle
&> {
774 ReplicatedBackend
*bc
;
775 list
<ReplicatedBackend::pull_complete_info
> to_continue
;
777 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend
*bc
, int priority
)
778 : bc(bc
), priority(priority
) {}
780 void finish(ThreadPool::TPHandle
&handle
) override
{
781 ReplicatedBackend::RPGHandle
*h
= bc
->_open_recovery_op();
782 for (auto &&i
: to_continue
) {
783 auto j
= bc
->pulling
.find(i
.hoid
);
784 ceph_assert(j
!= bc
->pulling
.end());
785 ObjectContextRef obc
= j
->second
.obc
;
786 bc
->clear_pull(j
, false /* already did it */);
787 int started
= bc
->start_pushes(i
.hoid
, obc
, h
);
789 bc
->pushing
[i
.hoid
].clear();
790 bc
->get_parent()->primary_failed(i
.hoid
);
791 bc
->get_parent()->primary_error(i
.hoid
, obc
->obs
.oi
.version
);
792 } else if (!started
) {
793 bc
->get_parent()->on_global_recover(
794 i
.hoid
, i
.stat
, false);
796 handle
.reset_tp_timeout();
798 bc
->run_recovery_op(h
, priority
);
802 void ReplicatedBackend::_do_pull_response(OpRequestRef op
)
804 const MOSDPGPush
*m
= static_cast<const MOSDPGPush
*>(op
->get_req());
805 ceph_assert(m
->get_type() == MSG_OSD_PG_PUSH
);
806 pg_shard_t from
= m
->from
;
810 vector
<PullOp
> replies(1);
811 if (get_parent()->check_failsafe_full()) {
812 dout(10) << __func__
<< " Out of space (failsafe) processing pull response (push)." << dendl
;
816 ObjectStore::Transaction t
;
817 list
<pull_complete_info
> to_continue
;
818 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
819 i
!= m
->pushes
.end();
821 bool more
= handle_pull_response(from
, *i
, &(replies
.back()), &to_continue
, &t
);
823 replies
.push_back(PullOp());
825 if (!to_continue
.empty()) {
826 C_ReplicatedBackend_OnPullComplete
*c
=
827 new C_ReplicatedBackend_OnPullComplete(
830 c
->to_continue
.swap(to_continue
);
831 t
.register_on_complete(
832 new PG_RecoveryQueueAsync(
834 get_parent()->bless_unlocked_gencontext(c
)));
836 replies
.erase(replies
.end() - 1);
838 if (replies
.size()) {
839 MOSDPGPull
*reply
= new MOSDPGPull
;
840 reply
->from
= parent
->whoami_shard();
841 reply
->set_priority(m
->get_priority());
842 reply
->pgid
= get_info().pgid
;
843 reply
->map_epoch
= m
->map_epoch
;
844 reply
->min_epoch
= m
->min_epoch
;
845 reply
->set_pulls(&replies
);
846 reply
->compute_cost(cct
);
848 t
.register_on_complete(
849 new PG_SendMessageOnConn(
850 get_parent(), reply
, m
->get_connection()));
853 get_parent()->queue_transaction(std::move(t
));
856 void ReplicatedBackend::do_pull(OpRequestRef op
)
858 MOSDPGPull
*m
= static_cast<MOSDPGPull
*>(op
->get_nonconst_req());
859 ceph_assert(m
->get_type() == MSG_OSD_PG_PULL
);
860 pg_shard_t from
= m
->from
;
862 map
<pg_shard_t
, vector
<PushOp
> > replies
;
863 vector
<PullOp
> pulls
;
864 m
->take_pulls(&pulls
);
865 for (auto& i
: pulls
) {
866 replies
[from
].push_back(PushOp());
867 handle_pull(from
, i
, &(replies
[from
].back()));
869 send_pushes(m
->get_priority(), replies
);
872 void ReplicatedBackend::do_push_reply(OpRequestRef op
)
874 const MOSDPGPushReply
*m
= static_cast<const MOSDPGPushReply
*>(op
->get_req());
875 ceph_assert(m
->get_type() == MSG_OSD_PG_PUSH_REPLY
);
876 pg_shard_t from
= m
->from
;
878 vector
<PushOp
> replies(1);
879 for (vector
<PushReplyOp
>::const_iterator i
= m
->replies
.begin();
880 i
!= m
->replies
.end();
882 bool more
= handle_push_reply(from
, *i
, &(replies
.back()));
884 replies
.push_back(PushOp());
886 replies
.erase(replies
.end() - 1);
888 map
<pg_shard_t
, vector
<PushOp
> > _replies
;
889 _replies
[from
].swap(replies
);
890 send_pushes(m
->get_priority(), _replies
);
893 Message
* ReplicatedBackend::generate_subop(
894 const hobject_t
&soid
,
895 const eversion_t
&at_version
,
898 eversion_t pg_trim_to
,
899 eversion_t pg_roll_forward_to
,
900 hobject_t new_temp_oid
,
901 hobject_t discard_temp_oid
,
902 const bufferlist
&log_entries
,
903 boost::optional
<pg_hit_set_history_t
> &hset_hist
,
904 ObjectStore::Transaction
&op_t
,
906 const pg_info_t
&pinfo
)
908 int acks_wanted
= CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
;
909 // forward the write/update/whatever
910 MOSDRepOp
*wr
= new MOSDRepOp(
911 reqid
, parent
->whoami_shard(),
912 spg_t(get_info().pgid
.pgid
, peer
.shard
),
915 parent
->get_last_peering_reset_epoch(),
918 // ship resulting transaction, log entries, and pg_stats
919 if (!parent
->should_send_op(peer
, soid
)) {
920 ObjectStore::Transaction t
;
921 encode(t
, wr
->get_data());
923 encode(op_t
, wr
->get_data());
924 wr
->get_header().data_off
= op_t
.get_data_alignment();
927 wr
->logbl
= log_entries
;
929 if (pinfo
.is_incomplete())
930 wr
->pg_stats
= pinfo
.stats
; // reflects backfill progress
932 wr
->pg_stats
= get_info().stats
;
934 wr
->pg_trim_to
= pg_trim_to
;
935 wr
->pg_roll_forward_to
= pg_roll_forward_to
;
937 wr
->new_temp_oid
= new_temp_oid
;
938 wr
->discard_temp_oid
= discard_temp_oid
;
939 wr
->updated_hit_set_history
= hset_hist
;
943 void ReplicatedBackend::issue_op(
944 const hobject_t
&soid
,
945 const eversion_t
&at_version
,
948 eversion_t pg_trim_to
,
949 eversion_t pg_roll_forward_to
,
950 hobject_t new_temp_oid
,
951 hobject_t discard_temp_oid
,
952 const vector
<pg_log_entry_t
> &log_entries
,
953 boost::optional
<pg_hit_set_history_t
> &hset_hist
,
955 ObjectStore::Transaction
&op_t
)
957 if (parent
->get_acting_recovery_backfill_shards().size() > 1) {
959 op
->op
->pg_trace
.event("issue replication ops");
961 set
<pg_shard_t
> replicas
= parent
->get_acting_recovery_backfill_shards();
962 replicas
.erase(parent
->whoami_shard());
963 ss
<< "waiting for subops from " << replicas
;
964 op
->op
->mark_sub_op_sent(ss
.str());
967 // avoid doing the same work in generate_subop
969 encode(log_entries
, logs
);
971 for (const auto& shard
: get_parent()->get_acting_recovery_backfill_shards()) {
972 if (shard
== parent
->whoami_shard()) continue;
973 const pg_info_t
&pinfo
= parent
->get_shard_info().find(shard
)->second
;
990 if (op
->op
&& op
->op
->pg_trace
)
991 wr
->trace
.init("replicated op", nullptr, &op
->op
->pg_trace
);
992 get_parent()->send_message_osd_cluster(
993 shard
.osd
, wr
, get_osdmap_epoch());
999 void ReplicatedBackend::do_repop(OpRequestRef op
)
1001 static_cast<MOSDRepOp
*>(op
->get_nonconst_req())->finish_decode();
1002 const MOSDRepOp
*m
= static_cast<const MOSDRepOp
*>(op
->get_req());
1003 int msg_type
= m
->get_type();
1004 ceph_assert(MSG_OSD_REPOP
== msg_type
);
1006 const hobject_t
& soid
= m
->poid
;
1008 dout(10) << __func__
<< " " << soid
1009 << " v " << m
->version
1010 << (m
->logbl
.length() ? " (transaction)" : " (parallel exec")
1011 << " " << m
->logbl
.length()
1015 ceph_assert(m
->map_epoch
>= get_info().history
.same_interval_since
);
1017 dout(30) << __func__
<< " missing before " << get_parent()->get_log().get_missing().get_items() << dendl
;
1018 parent
->maybe_preempt_replica_scrub(soid
);
1020 int ackerosd
= m
->get_source().num();
1024 RepModifyRef
rm(std::make_shared
<RepModify
>());
1026 rm
->ackerosd
= ackerosd
;
1027 rm
->last_complete
= get_info().last_complete
;
1028 rm
->epoch_started
= get_osdmap_epoch();
1030 ceph_assert(m
->logbl
.length());
1031 // shipped transaction and log entries
1032 vector
<pg_log_entry_t
> log
;
1034 auto p
= const_cast<bufferlist
&>(m
->get_data()).cbegin();
1037 if (m
->new_temp_oid
!= hobject_t()) {
1038 dout(20) << __func__
<< " start tracking temp " << m
->new_temp_oid
<< dendl
;
1039 add_temp_obj(m
->new_temp_oid
);
1041 if (m
->discard_temp_oid
!= hobject_t()) {
1042 dout(20) << __func__
<< " stop tracking temp " << m
->discard_temp_oid
<< dendl
;
1043 if (rm
->opt
.empty()) {
1044 dout(10) << __func__
<< ": removing object " << m
->discard_temp_oid
1045 << " since we won't get the transaction" << dendl
;
1046 rm
->localt
.remove(coll
, ghobject_t(m
->discard_temp_oid
));
1048 clear_temp_obj(m
->discard_temp_oid
);
1051 p
= const_cast<bufferlist
&>(m
->logbl
).begin();
1053 rm
->opt
.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
1055 bool update_snaps
= false;
1056 if (!rm
->opt
.empty()) {
1057 // If the opt is non-empty, we infer we are before
1058 // last_backfill (according to the primary, not our
1059 // not-quite-accurate value), and should update the
1060 // collections now. Otherwise, we do it later on push.
1061 update_snaps
= true;
1064 // flag set to true during async recovery
1066 pg_missing_tracker_t pmissing
= get_parent()->get_local_missing();
1067 if (pmissing
.is_missing(soid
)) {
1069 dout(30) << __func__
<< " is_missing " << pmissing
.is_missing(soid
) << dendl
;
1070 for (auto &&e
: log
) {
1071 dout(30) << " add_next_event entry " << e
<< dendl
;
1072 get_parent()->add_local_next_event(e
);
1073 dout(30) << " entry is_delete " << e
.is_delete() << dendl
;
1077 parent
->update_stats(m
->pg_stats
);
1078 parent
->log_operation(
1080 m
->updated_hit_set_history
,
1082 m
->pg_roll_forward_to
,
1087 rm
->opt
.register_on_commit(
1088 parent
->bless_context(
1089 new C_OSD_RepModifyCommit(this, rm
)));
1090 vector
<ObjectStore::Transaction
> tls
;
1092 tls
.push_back(std::move(rm
->localt
));
1093 tls
.push_back(std::move(rm
->opt
));
1094 parent
->queue_transactions(tls
, op
);
1095 // op is cleaned up by oncommit/onapply when both are executed
1096 dout(30) << __func__
<< " missing after" << get_parent()->get_log().get_missing().get_items() << dendl
;
1099 void ReplicatedBackend::repop_commit(RepModifyRef rm
)
1101 rm
->op
->mark_commit_sent();
1102 rm
->op
->pg_trace
.event("sup_op_commit");
1103 rm
->committed
= true;
1106 const MOSDRepOp
*m
= static_cast<const MOSDRepOp
*>(rm
->op
->get_req());
1107 ceph_assert(m
->get_type() == MSG_OSD_REPOP
);
1108 dout(10) << __func__
<< " on op " << *m
1109 << ", sending commit to osd." << rm
->ackerosd
1111 ceph_assert(get_osdmap()->is_up(rm
->ackerosd
));
1113 get_parent()->update_last_complete_ondisk(rm
->last_complete
);
1115 MOSDRepOpReply
*reply
= new MOSDRepOpReply(
1117 get_parent()->whoami_shard(),
1118 0, get_osdmap_epoch(), m
->get_min_epoch(), CEPH_OSD_FLAG_ONDISK
);
1119 reply
->set_last_complete_ondisk(rm
->last_complete
);
1120 reply
->set_priority(CEPH_MSG_PRIO_HIGH
); // this better match ack priority!
1121 reply
->trace
= rm
->op
->pg_trace
;
1122 get_parent()->send_message_osd_cluster(
1123 rm
->ackerosd
, reply
, get_osdmap_epoch());
1125 log_subop_stats(get_parent()->get_logger(), rm
->op
, l_osd_sop_w
);
1129 // ===========================================================
1131 void ReplicatedBackend::calc_head_subsets(
1132 ObjectContextRef obc
, SnapSet
& snapset
, const hobject_t
& head
,
1133 const pg_missing_t
& missing
,
1134 const hobject_t
&last_backfill
,
1135 interval_set
<uint64_t>& data_subset
,
1136 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1137 ObcLockManager
&manager
)
1139 dout(10) << "calc_head_subsets " << head
1140 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1142 uint64_t size
= obc
->obs
.oi
.size
;
1144 data_subset
.insert(0, size
);
1146 if (get_parent()->get_pool().allow_incomplete_clones()) {
1147 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1151 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1152 dout(10) << "calc_head_subsets " << head
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1157 interval_set
<uint64_t> cloning
;
1158 interval_set
<uint64_t> prev
;
1160 prev
.insert(0, size
);
1162 for (int j
=snapset
.clones
.size()-1; j
>=0; j
--) {
1164 c
.snap
= snapset
.clones
[j
];
1165 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1166 if (!missing
.is_missing(c
) &&
1167 c
< last_backfill
&&
1168 get_parent()->try_lock_for_read(c
, manager
)) {
1169 dout(10) << "calc_head_subsets " << head
<< " has prev " << c
1170 << " overlap " << prev
<< dendl
;
1171 clone_subsets
[c
] = prev
;
1172 cloning
.union_of(prev
);
1175 dout(10) << "calc_head_subsets " << head
<< " does not have prev " << c
1176 << " overlap " << prev
<< dendl
;
1180 if (cloning
.num_intervals() > cct
->_conf
->osd_recover_clone_overlap_limit
) {
1181 dout(10) << "skipping clone, too many holes" << dendl
;
1182 get_parent()->release_locks(manager
);
1183 clone_subsets
.clear();
1187 // what's left for us to push?
1188 data_subset
.subtract(cloning
);
1190 dout(10) << "calc_head_subsets " << head
1191 << " data_subset " << data_subset
1192 << " clone_subsets " << clone_subsets
<< dendl
;
1195 void ReplicatedBackend::calc_clone_subsets(
1196 SnapSet
& snapset
, const hobject_t
& soid
,
1197 const pg_missing_t
& missing
,
1198 const hobject_t
&last_backfill
,
1199 interval_set
<uint64_t>& data_subset
,
1200 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1201 ObcLockManager
&manager
)
1203 dout(10) << "calc_clone_subsets " << soid
1204 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1206 uint64_t size
= snapset
.clone_size
[soid
.snap
];
1208 data_subset
.insert(0, size
);
1210 if (get_parent()->get_pool().allow_incomplete_clones()) {
1211 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1215 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1216 dout(10) << "calc_clone_subsets " << soid
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1221 for (i
=0; i
< snapset
.clones
.size(); i
++)
1222 if (snapset
.clones
[i
] == soid
.snap
)
1225 // any overlap with next older clone?
1226 interval_set
<uint64_t> cloning
;
1227 interval_set
<uint64_t> prev
;
1229 prev
.insert(0, size
);
1230 for (int j
=i
-1; j
>=0; j
--) {
1232 c
.snap
= snapset
.clones
[j
];
1233 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1234 if (!missing
.is_missing(c
) &&
1235 c
< last_backfill
&&
1236 get_parent()->try_lock_for_read(c
, manager
)) {
1237 dout(10) << "calc_clone_subsets " << soid
<< " has prev " << c
1238 << " overlap " << prev
<< dendl
;
1239 clone_subsets
[c
] = prev
;
1240 cloning
.union_of(prev
);
1243 dout(10) << "calc_clone_subsets " << soid
<< " does not have prev " << c
1244 << " overlap " << prev
<< dendl
;
1247 // overlap with next newest?
1248 interval_set
<uint64_t> next
;
1250 next
.insert(0, size
);
1251 for (unsigned j
=i
+1; j
<snapset
.clones
.size(); j
++) {
1253 c
.snap
= snapset
.clones
[j
];
1254 next
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
-1]]);
1255 if (!missing
.is_missing(c
) &&
1256 c
< last_backfill
&&
1257 get_parent()->try_lock_for_read(c
, manager
)) {
1258 dout(10) << "calc_clone_subsets " << soid
<< " has next " << c
1259 << " overlap " << next
<< dendl
;
1260 clone_subsets
[c
] = next
;
1261 cloning
.union_of(next
);
1264 dout(10) << "calc_clone_subsets " << soid
<< " does not have next " << c
1265 << " overlap " << next
<< dendl
;
1268 if (cloning
.num_intervals() > cct
->_conf
->osd_recover_clone_overlap_limit
) {
1269 dout(10) << "skipping clone, too many holes" << dendl
;
1270 get_parent()->release_locks(manager
);
1271 clone_subsets
.clear();
1276 // what's left for us to push?
1277 data_subset
.subtract(cloning
);
1279 dout(10) << "calc_clone_subsets " << soid
1280 << " data_subset " << data_subset
1281 << " clone_subsets " << clone_subsets
<< dendl
;
1284 void ReplicatedBackend::prepare_pull(
1286 const hobject_t
& soid
,
1287 ObjectContextRef headctx
,
1290 ceph_assert(get_parent()->get_local_missing().get_items().count(soid
));
1291 eversion_t _v
= get_parent()->get_local_missing().get_items().find(
1293 ceph_assert(_v
== v
);
1294 const map
<hobject_t
, set
<pg_shard_t
>> &missing_loc(
1295 get_parent()->get_missing_loc_shards());
1296 const map
<pg_shard_t
, pg_missing_t
> &peer_missing(
1297 get_parent()->get_shard_missing());
1298 map
<hobject_t
, set
<pg_shard_t
>>::const_iterator q
= missing_loc
.find(soid
);
1299 ceph_assert(q
!= missing_loc
.end());
1300 ceph_assert(!q
->second
.empty());
1303 auto p
= q
->second
.end();
1304 if (cct
->_conf
->osd_debug_feed_pullee
>= 0) {
1305 for (auto it
= q
->second
.begin(); it
!= q
->second
.end(); it
++) {
1306 if (it
->osd
== cct
->_conf
->osd_debug_feed_pullee
) {
1312 if (p
== q
->second
.end()) {
1313 // probably because user feed a wrong pullee
1314 p
= q
->second
.begin();
1316 util::generate_random_number
<int>(0,
1317 q
->second
.size() - 1));
1319 ceph_assert(get_osdmap()->is_up(p
->osd
));
1320 pg_shard_t fromshard
= *p
;
1322 dout(7) << "pull " << soid
1324 << " on osds " << q
->second
1325 << " from osd." << fromshard
1328 ceph_assert(peer_missing
.count(fromshard
));
1329 const pg_missing_t
&pmissing
= peer_missing
.find(fromshard
)->second
;
1330 if (pmissing
.is_missing(soid
, v
)) {
1331 ceph_assert(pmissing
.get_items().find(soid
)->second
.have
!= v
);
1332 dout(10) << "pulling soid " << soid
<< " from osd " << fromshard
1333 << " at version " << pmissing
.get_items().find(soid
)->second
.have
1334 << " rather than at version " << v
<< dendl
;
1335 v
= pmissing
.get_items().find(soid
)->second
.have
;
1336 ceph_assert(get_parent()->get_log().get_log().objects
.count(soid
) &&
1337 (get_parent()->get_log().get_log().objects
.find(soid
)->second
->op
==
1338 pg_log_entry_t::LOST_REVERT
) &&
1339 (get_parent()->get_log().get_log().objects
.find(
1340 soid
)->second
->reverting_to
==
1344 ObjectRecoveryInfo recovery_info
;
1345 ObcLockManager lock_manager
;
1347 if (soid
.is_snap()) {
1348 ceph_assert(!get_parent()->get_local_missing().is_missing(soid
.get_head()));
1349 ceph_assert(headctx
);
1351 SnapSetContext
*ssc
= headctx
->ssc
;
1353 dout(10) << " snapset " << ssc
->snapset
<< dendl
;
1354 recovery_info
.ss
= ssc
->snapset
;
1356 ssc
->snapset
, soid
, get_parent()->get_local_missing(),
1357 get_info().last_backfill
,
1358 recovery_info
.copy_subset
,
1359 recovery_info
.clone_subset
,
1361 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1362 dout(10) << " pulling " << recovery_info
<< dendl
;
1364 ceph_assert(ssc
->snapset
.clone_size
.count(soid
.snap
));
1365 recovery_info
.size
= ssc
->snapset
.clone_size
[soid
.snap
];
1367 // pulling head or unversioned object.
1368 // always pull the whole thing.
1369 recovery_info
.copy_subset
.insert(0, (uint64_t)-1);
1370 recovery_info
.size
= ((uint64_t)-1);
1373 h
->pulls
[fromshard
].push_back(PullOp());
1374 PullOp
&op
= h
->pulls
[fromshard
].back();
1377 op
.recovery_info
= recovery_info
;
1378 op
.recovery_info
.soid
= soid
;
1379 op
.recovery_info
.version
= v
;
1380 op
.recovery_progress
.data_complete
= false;
1381 op
.recovery_progress
.omap_complete
= false;
1382 op
.recovery_progress
.data_recovered_to
= 0;
1383 op
.recovery_progress
.first
= true;
1385 ceph_assert(!pulling
.count(soid
));
1386 pull_from_peer
[fromshard
].insert(soid
);
1387 PullInfo
&pi
= pulling
[soid
];
1388 pi
.from
= fromshard
;
1390 pi
.head_ctx
= headctx
;
1391 pi
.recovery_info
= op
.recovery_info
;
1392 pi
.recovery_progress
= op
.recovery_progress
;
1393 pi
.cache_dont_need
= h
->cache_dont_need
;
1394 pi
.lock_manager
= std::move(lock_manager
);
1398 * intelligently push an object to a replica. make use of existing
1399 * clones/heads and dup data ranges where possible.
1401 int ReplicatedBackend::prep_push_to_replica(
1402 ObjectContextRef obc
, const hobject_t
& soid
, pg_shard_t peer
,
1403 PushOp
*pop
, bool cache_dont_need
)
1405 const object_info_t
& oi
= obc
->obs
.oi
;
1406 uint64_t size
= obc
->obs
.oi
.size
;
1408 dout(10) << __func__
<< ": " << soid
<< " v" << oi
.version
1409 << " size " << size
<< " to osd." << peer
<< dendl
;
1411 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1412 interval_set
<uint64_t> data_subset
;
1414 ObcLockManager lock_manager
;
1415 // are we doing a clone on the replica?
1416 if (soid
.snap
&& soid
.snap
< CEPH_NOSNAP
) {
1417 hobject_t head
= soid
;
1418 head
.snap
= CEPH_NOSNAP
;
1420 // try to base push off of clones that succeed/preceed poid
1421 // we need the head (and current SnapSet) locally to do that.
1422 if (get_parent()->get_local_missing().is_missing(head
)) {
1423 dout(15) << "push_to_replica missing head " << head
<< ", pushing raw clone" << dendl
;
1424 return prep_push(obc
, soid
, peer
, pop
, cache_dont_need
);
1427 SnapSetContext
*ssc
= obc
->ssc
;
1429 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1430 pop
->recovery_info
.ss
= ssc
->snapset
;
1431 map
<pg_shard_t
, pg_missing_t
>::const_iterator pm
=
1432 get_parent()->get_shard_missing().find(peer
);
1433 ceph_assert(pm
!= get_parent()->get_shard_missing().end());
1434 map
<pg_shard_t
, pg_info_t
>::const_iterator pi
=
1435 get_parent()->get_shard_info().find(peer
);
1436 ceph_assert(pi
!= get_parent()->get_shard_info().end());
1440 pi
->second
.last_backfill
,
1441 data_subset
, clone_subsets
,
1443 } else if (soid
.snap
== CEPH_NOSNAP
) {
1444 // pushing head or unversioned object.
1445 // base this on partially on replica's clones?
1446 SnapSetContext
*ssc
= obc
->ssc
;
1448 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1451 ssc
->snapset
, soid
, get_parent()->get_shard_missing().find(peer
)->second
,
1452 get_parent()->get_shard_info().find(peer
)->second
.last_backfill
,
1453 data_subset
, clone_subsets
,
1466 std::move(lock_manager
));
1469 int ReplicatedBackend::prep_push(ObjectContextRef obc
,
1470 const hobject_t
& soid
, pg_shard_t peer
,
1471 PushOp
*pop
, bool cache_dont_need
)
1473 interval_set
<uint64_t> data_subset
;
1474 if (obc
->obs
.oi
.size
)
1475 data_subset
.insert(0, obc
->obs
.oi
.size
);
1476 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1478 return prep_push(obc
, soid
, peer
,
1479 obc
->obs
.oi
.version
, data_subset
, clone_subsets
,
1480 pop
, cache_dont_need
, ObcLockManager());
1483 int ReplicatedBackend::prep_push(
1484 ObjectContextRef obc
,
1485 const hobject_t
& soid
, pg_shard_t peer
,
1487 interval_set
<uint64_t> &data_subset
,
1488 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1490 bool cache_dont_need
,
1491 ObcLockManager
&&lock_manager
)
1493 get_parent()->begin_peer_recover(peer
, soid
);
1495 PushInfo
&pi
= pushing
[soid
][peer
];
1497 pi
.recovery_info
.size
= obc
->obs
.oi
.size
;
1498 pi
.recovery_info
.copy_subset
= data_subset
;
1499 pi
.recovery_info
.clone_subset
= clone_subsets
;
1500 pi
.recovery_info
.soid
= soid
;
1501 pi
.recovery_info
.oi
= obc
->obs
.oi
;
1502 pi
.recovery_info
.ss
= pop
->recovery_info
.ss
;
1503 pi
.recovery_info
.version
= version
;
1504 pi
.lock_manager
= std::move(lock_manager
);
1506 ObjectRecoveryProgress new_progress
;
1507 int r
= build_push_op(pi
.recovery_info
,
1508 pi
.recovery_progress
,
1511 &(pi
.stat
), cache_dont_need
);
1514 pi
.recovery_progress
= new_progress
;
1518 void ReplicatedBackend::submit_push_data(
1519 const ObjectRecoveryInfo
&recovery_info
,
1522 bool cache_dont_need
,
1523 const interval_set
<uint64_t> &intervals_included
,
1524 bufferlist data_included
,
1525 bufferlist omap_header
,
1526 const map
<string
, bufferlist
> &attrs
,
1527 const map
<string
, bufferlist
> &omap_entries
,
1528 ObjectStore::Transaction
*t
)
1530 hobject_t target_oid
;
1531 if (first
&& complete
) {
1532 target_oid
= recovery_info
.soid
;
1534 target_oid
= get_parent()->get_temp_recovery_object(recovery_info
.soid
,
1535 recovery_info
.version
);
1537 dout(10) << __func__
<< ": Adding oid "
1538 << target_oid
<< " in the temp collection" << dendl
;
1539 add_temp_obj(target_oid
);
1544 t
->remove(coll
, ghobject_t(target_oid
));
1545 t
->touch(coll
, ghobject_t(target_oid
));
1546 t
->truncate(coll
, ghobject_t(target_oid
), recovery_info
.size
);
1547 if (omap_header
.length())
1548 t
->omap_setheader(coll
, ghobject_t(target_oid
), omap_header
);
1550 bufferlist bv
= attrs
.at(OI_ATTR
);
1551 object_info_t
oi(bv
);
1552 t
->set_alloc_hint(coll
, ghobject_t(target_oid
),
1553 oi
.expected_object_size
,
1554 oi
.expected_write_size
,
1555 oi
.alloc_hint_flags
);
1556 if (get_parent()->pg_is_remote_backfilling()) {
1559 int r
= store
->stat(ch
, ghobject_t(recovery_info
.soid
), &st
);
1563 // Don't need to do anything if object is still the same size
1564 if (size
!= recovery_info
.oi
.size
) {
1565 get_parent()->pg_add_local_num_bytes((int64_t)recovery_info
.oi
.size
- (int64_t)size
);
1566 get_parent()->pg_add_num_bytes((int64_t)recovery_info
.oi
.size
- (int64_t)size
);
1567 dout(10) << __func__
<< " " << recovery_info
.soid
1568 << " backfill size " << recovery_info
.oi
.size
1569 << " previous size " << size
1570 << " net size " << recovery_info
.oi
.size
- size
1576 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
;
1577 if (cache_dont_need
)
1578 fadvise_flags
|= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
1579 for (interval_set
<uint64_t>::const_iterator p
= intervals_included
.begin();
1580 p
!= intervals_included
.end();
1583 bit
.substr_of(data_included
, off
, p
.get_len());
1584 t
->write(coll
, ghobject_t(target_oid
),
1585 p
.get_start(), p
.get_len(), bit
, fadvise_flags
);
1589 if (!omap_entries
.empty())
1590 t
->omap_setkeys(coll
, ghobject_t(target_oid
), omap_entries
);
1592 t
->setattrs(coll
, ghobject_t(target_oid
), attrs
);
1596 dout(10) << __func__
<< ": Removing oid "
1597 << target_oid
<< " from the temp collection" << dendl
;
1598 clear_temp_obj(target_oid
);
1599 t
->remove(coll
, ghobject_t(recovery_info
.soid
));
1600 t
->collection_move_rename(coll
, ghobject_t(target_oid
),
1601 coll
, ghobject_t(recovery_info
.soid
));
1604 submit_push_complete(recovery_info
, t
);
1608 void ReplicatedBackend::submit_push_complete(
1609 const ObjectRecoveryInfo
&recovery_info
,
1610 ObjectStore::Transaction
*t
)
1612 for (map
<hobject_t
, interval_set
<uint64_t>>::const_iterator p
=
1613 recovery_info
.clone_subset
.begin();
1614 p
!= recovery_info
.clone_subset
.end();
1616 for (interval_set
<uint64_t>::const_iterator q
= p
->second
.begin();
1617 q
!= p
->second
.end();
1619 dout(15) << " clone_range " << p
->first
<< " "
1620 << q
.get_start() << "~" << q
.get_len() << dendl
;
1621 t
->clone_range(coll
, ghobject_t(p
->first
), ghobject_t(recovery_info
.soid
),
1622 q
.get_start(), q
.get_len(), q
.get_start());
1627 ObjectRecoveryInfo
ReplicatedBackend::recalc_subsets(
1628 const ObjectRecoveryInfo
& recovery_info
,
1629 SnapSetContext
*ssc
,
1630 ObcLockManager
&manager
)
1632 if (!recovery_info
.soid
.snap
|| recovery_info
.soid
.snap
>= CEPH_NOSNAP
)
1633 return recovery_info
;
1634 ObjectRecoveryInfo new_info
= recovery_info
;
1635 new_info
.copy_subset
.clear();
1636 new_info
.clone_subset
.clear();
1638 get_parent()->release_locks(manager
); // might already have locks
1640 ssc
->snapset
, new_info
.soid
, get_parent()->get_local_missing(),
1641 get_info().last_backfill
,
1642 new_info
.copy_subset
, new_info
.clone_subset
,
1647 bool ReplicatedBackend::handle_pull_response(
1648 pg_shard_t from
, const PushOp
&pop
, PullOp
*response
,
1649 list
<pull_complete_info
> *to_continue
,
1650 ObjectStore::Transaction
*t
)
1652 interval_set
<uint64_t> data_included
= pop
.data_included
;
1655 dout(10) << "handle_pull_response "
1656 << pop
.recovery_info
1657 << pop
.after_progress
1658 << " data.size() is " << data
.length()
1659 << " data_included: " << data_included
1661 if (pop
.version
== eversion_t()) {
1662 // replica doesn't have it!
1663 _failed_pull(from
, pop
.soid
);
1667 const hobject_t
&hoid
= pop
.soid
;
1668 ceph_assert((data_included
.empty() && data
.length() == 0) ||
1669 (!data_included
.empty() && data
.length() > 0));
1671 auto piter
= pulling
.find(hoid
);
1672 if (piter
== pulling
.end()) {
1676 PullInfo
&pi
= piter
->second
;
1677 if (pi
.recovery_info
.size
== (uint64_t(-1))) {
1678 pi
.recovery_info
.size
= pop
.recovery_info
.size
;
1679 pi
.recovery_info
.copy_subset
.intersection_of(
1680 pop
.recovery_info
.copy_subset
);
1682 // If primary doesn't have object info and didn't know version
1683 if (pi
.recovery_info
.version
== eversion_t()) {
1684 pi
.recovery_info
.version
= pop
.version
;
1687 bool first
= pi
.recovery_progress
.first
;
1689 // attrs only reference the origin bufferlist (decode from
1690 // MOSDPGPush message) whose size is much greater than attrs in
1691 // recovery. If obc cache it (get_obc maybe cache the attr), this
1692 // causes the whole origin bufferlist would not be free until obc
1693 // is evicted from obc cache. So rebuild the bufferlists before
1695 auto attrset
= pop
.attrset
;
1696 for (auto& a
: attrset
) {
1699 pi
.obc
= get_parent()->get_obc(pi
.recovery_info
.soid
, attrset
);
1700 pi
.recovery_info
.oi
= pi
.obc
->obs
.oi
;
1701 pi
.recovery_info
= recalc_subsets(
1708 interval_set
<uint64_t> usable_intervals
;
1709 bufferlist usable_data
;
1710 trim_pushed_data(pi
.recovery_info
.copy_subset
,
1715 data_included
= usable_intervals
;
1716 data
.claim(usable_data
);
1719 pi
.recovery_progress
= pop
.after_progress
;
1721 dout(10) << "new recovery_info " << pi
.recovery_info
1722 << ", new progress " << pi
.recovery_progress
1725 bool complete
= pi
.is_complete();
1727 submit_push_data(pi
.recovery_info
, first
,
1728 complete
, pi
.cache_dont_need
,
1729 data_included
, data
,
1735 pi
.stat
.num_keys_recovered
+= pop
.omap_entries
.size();
1736 pi
.stat
.num_bytes_recovered
+= data
.length();
1737 get_parent()->get_logger()->inc(l_osd_rbytes
, pop
.omap_entries
.size() + data
.length());
1740 pi
.stat
.num_objects_recovered
++;
1741 // XXX: This could overcount if regular recovery is needed right after a repair
1742 if (get_parent()->pg_is_repair()) {
1743 pi
.stat
.num_objects_repaired
++;
1744 get_parent()->inc_osd_stat_repaired();
1746 clear_pull_from(piter
);
1747 to_continue
->push_back({hoid
, pi
.stat
});
1748 get_parent()->on_local_recover(
1749 hoid
, pi
.recovery_info
, pi
.obc
, false, t
);
1752 response
->soid
= pop
.soid
;
1753 response
->recovery_info
= pi
.recovery_info
;
1754 response
->recovery_progress
= pi
.recovery_progress
;
1759 void ReplicatedBackend::handle_push(
1760 pg_shard_t from
, const PushOp
&pop
, PushReplyOp
*response
,
1761 ObjectStore::Transaction
*t
, bool is_repair
)
1763 dout(10) << "handle_push "
1764 << pop
.recovery_info
1765 << pop
.after_progress
1769 bool first
= pop
.before_progress
.first
;
1770 bool complete
= pop
.after_progress
.data_complete
&&
1771 pop
.after_progress
.omap_complete
;
1773 response
->soid
= pop
.recovery_info
.soid
;
1774 submit_push_data(pop
.recovery_info
,
1777 true, // must be replicate
1787 get_parent()->inc_osd_stat_repaired();
1788 dout(20) << __func__
<< " repair complete" << dendl
;
1790 get_parent()->on_local_recover(
1791 pop
.recovery_info
.soid
,
1793 ObjectContextRef(), // ok, is replica
1799 void ReplicatedBackend::send_pushes(int prio
, map
<pg_shard_t
, vector
<PushOp
> > &pushes
)
1801 for (map
<pg_shard_t
, vector
<PushOp
> >::iterator i
= pushes
.begin();
1804 ConnectionRef con
= get_parent()->get_con_osd_cluster(
1806 get_osdmap_epoch());
1809 vector
<PushOp
>::iterator j
= i
->second
.begin();
1810 while (j
!= i
->second
.end()) {
1812 uint64_t pushes
= 0;
1813 MOSDPGPush
*msg
= new MOSDPGPush();
1814 msg
->from
= get_parent()->whoami_shard();
1815 msg
->pgid
= get_parent()->primary_spg_t();
1816 msg
->map_epoch
= get_osdmap_epoch();
1817 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
1818 msg
->set_priority(prio
);
1819 msg
->is_repair
= get_parent()->pg_is_repair();
1821 (j
!= i
->second
.end() &&
1822 cost
< cct
->_conf
->osd_max_push_cost
&&
1823 pushes
< cct
->_conf
->osd_max_push_objects
) ;
1825 dout(20) << __func__
<< ": sending push " << *j
1826 << " to osd." << i
->first
<< dendl
;
1827 cost
+= j
->cost(cct
);
1829 msg
->pushes
.push_back(*j
);
1831 msg
->set_cost(cost
);
1832 get_parent()->send_message_osd_cluster(msg
, con
);
1837 void ReplicatedBackend::send_pulls(int prio
, map
<pg_shard_t
, vector
<PullOp
> > &pulls
)
1839 for (map
<pg_shard_t
, vector
<PullOp
> >::iterator i
= pulls
.begin();
1842 ConnectionRef con
= get_parent()->get_con_osd_cluster(
1844 get_osdmap_epoch());
1847 dout(20) << __func__
<< ": sending pulls " << i
->second
1848 << " to osd." << i
->first
<< dendl
;
1849 MOSDPGPull
*msg
= new MOSDPGPull();
1850 msg
->from
= parent
->whoami_shard();
1851 msg
->set_priority(prio
);
1852 msg
->pgid
= get_parent()->primary_spg_t();
1853 msg
->map_epoch
= get_osdmap_epoch();
1854 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
1855 msg
->set_pulls(&i
->second
);
1856 msg
->compute_cost(cct
);
1857 get_parent()->send_message_osd_cluster(msg
, con
);
1861 int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo
&recovery_info
,
1862 const ObjectRecoveryProgress
&progress
,
1863 ObjectRecoveryProgress
*out_progress
,
1865 object_stat_sum_t
*stat
,
1866 bool cache_dont_need
)
1868 ObjectRecoveryProgress _new_progress
;
1870 out_progress
= &_new_progress
;
1871 ObjectRecoveryProgress
&new_progress
= *out_progress
;
1872 new_progress
= progress
;
1874 dout(7) << __func__
<< " " << recovery_info
.soid
1875 << " v " << recovery_info
.version
1876 << " size " << recovery_info
.size
1877 << " recovery_info: " << recovery_info
1880 eversion_t v
= recovery_info
.version
;
1882 if (progress
.first
) {
1883 int r
= store
->omap_get_header(ch
, ghobject_t(recovery_info
.soid
), &out_op
->omap_header
);
1885 dout(1) << __func__
<< " get omap header failed: " << cpp_strerror(-r
) << dendl
;
1888 r
= store
->getattrs(ch
, ghobject_t(recovery_info
.soid
), out_op
->attrset
);
1890 dout(1) << __func__
<< " getattrs failed: " << cpp_strerror(-r
) << dendl
;
1895 bufferlist bv
= out_op
->attrset
[OI_ATTR
];
1897 auto bliter
= bv
.cbegin();
1900 dout(0) << __func__
<< ": bad object_info_t: " << recovery_info
.soid
<< dendl
;
1904 // If requestor didn't know the version, use ours
1905 if (v
== eversion_t()) {
1907 } else if (oi
.version
!= v
) {
1908 get_parent()->clog_error() << get_info().pgid
<< " push "
1909 << recovery_info
.soid
<< " v "
1910 << recovery_info
.version
1911 << " failed because local copy is "
1916 new_progress
.first
= false;
1918 // Once we provide the version subsequent requests will have it, so
1919 // at this point it must be known.
1920 ceph_assert(v
!= eversion_t());
1922 uint64_t available
= cct
->_conf
->osd_recovery_max_chunk
;
1923 if (!progress
.omap_complete
) {
1924 ObjectMap::ObjectMapIterator iter
=
1925 store
->get_omap_iterator(ch
,
1926 ghobject_t(recovery_info
.soid
));
1928 for (iter
->lower_bound(progress
.omap_recovered_to
);
1931 if (!out_op
->omap_entries
.empty() &&
1932 ((cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
> 0 &&
1933 out_op
->omap_entries
.size() >= cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
) ||
1934 available
<= iter
->key().size() + iter
->value().length()))
1936 out_op
->omap_entries
.insert(make_pair(iter
->key(), iter
->value()));
1938 if ((iter
->key().size() + iter
->value().length()) <= available
)
1939 available
-= (iter
->key().size() + iter
->value().length());
1944 new_progress
.omap_complete
= true;
1946 new_progress
.omap_recovered_to
= iter
->key();
1949 if (available
> 0) {
1950 if (!recovery_info
.copy_subset
.empty()) {
1951 interval_set
<uint64_t> copy_subset
= recovery_info
.copy_subset
;
1952 map
<uint64_t, uint64_t> m
;
1953 int r
= store
->fiemap(ch
, ghobject_t(recovery_info
.soid
), 0,
1954 copy_subset
.range_end(), m
);
1956 interval_set
<uint64_t> fiemap_included(m
);
1957 copy_subset
.intersection_of(fiemap_included
);
1959 // intersection of copy_subset and empty interval_set would be empty anyway
1960 copy_subset
.clear();
1963 out_op
->data_included
.span_of(copy_subset
, progress
.data_recovered_to
,
1965 if (out_op
->data_included
.empty()) // zero filled section, skip to end!
1966 new_progress
.data_recovered_to
= recovery_info
.copy_subset
.range_end();
1968 new_progress
.data_recovered_to
= out_op
->data_included
.range_end();
1971 out_op
->data_included
.clear();
1974 for (interval_set
<uint64_t>::iterator p
= out_op
->data_included
.begin();
1975 p
!= out_op
->data_included
.end();
1978 int r
= store
->read(ch
, ghobject_t(recovery_info
.soid
),
1979 p
.get_start(), p
.get_len(), bit
,
1980 cache_dont_need
? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
: 0);
1981 if (cct
->_conf
->osd_debug_random_push_read_error
&&
1982 (rand() % (int)(cct
->_conf
->osd_debug_random_push_read_error
* 100.0)) == 0) {
1983 dout(0) << __func__
<< ": inject EIO " << recovery_info
.soid
<< dendl
;
1989 if (p
.get_len() != bit
.length()) {
1990 dout(10) << " extent " << p
.get_start() << "~" << p
.get_len()
1991 << " is actually " << p
.get_start() << "~" << bit
.length()
1993 interval_set
<uint64_t>::iterator save
= p
++;
1994 if (bit
.length() == 0)
1995 out_op
->data_included
.erase(save
); //Remove this empty interval
1997 save
.set_len(bit
.length());
1998 // Remove any other intervals present
1999 while (p
!= out_op
->data_included
.end()) {
2000 interval_set
<uint64_t>::iterator save
= p
++;
2001 out_op
->data_included
.erase(save
);
2003 new_progress
.data_complete
= true;
2004 out_op
->data
.claim_append(bit
);
2007 out_op
->data
.claim_append(bit
);
2009 if (progress
.first
&& !out_op
->data_included
.empty() &&
2010 out_op
->data_included
.begin().get_start() == 0 &&
2011 out_op
->data
.length() == oi
.size
&& oi
.is_data_digest()) {
2012 uint32_t crc
= out_op
->data
.crc32c(-1);
2013 if (oi
.data_digest
!= crc
) {
2014 dout(0) << __func__
<< " " << coll
<< std::hex
2015 << " full-object read crc 0x" << crc
2016 << " != expected 0x" << oi
.data_digest
2017 << std::dec
<< " on " << recovery_info
.soid
<< dendl
;
2022 if (new_progress
.is_complete(recovery_info
)) {
2023 new_progress
.data_complete
= true;
2025 stat
->num_objects_recovered
++;
2026 if (get_parent()->pg_is_repair())
2027 stat
->num_objects_repaired
++;
2032 stat
->num_keys_recovered
+= out_op
->omap_entries
.size();
2033 stat
->num_bytes_recovered
+= out_op
->data
.length();
2034 get_parent()->get_logger()->inc(l_osd_rbytes
, out_op
->omap_entries
.size() + out_op
->data
.length());
2037 get_parent()->get_logger()->inc(l_osd_push
);
2038 get_parent()->get_logger()->inc(l_osd_push_outb
, out_op
->data
.length());
2041 out_op
->version
= v
;
2042 out_op
->soid
= recovery_info
.soid
;
2043 out_op
->recovery_info
= recovery_info
;
2044 out_op
->after_progress
= new_progress
;
2045 out_op
->before_progress
= progress
;
2049 void ReplicatedBackend::prep_push_op_blank(const hobject_t
& soid
, PushOp
*op
)
2051 op
->recovery_info
.version
= eversion_t();
2052 op
->version
= eversion_t();
2056 bool ReplicatedBackend::handle_push_reply(
2057 pg_shard_t peer
, const PushReplyOp
&op
, PushOp
*reply
)
2059 const hobject_t
&soid
= op
.soid
;
2060 if (pushing
.count(soid
) == 0) {
2061 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2062 << ", or anybody else"
2065 } else if (pushing
[soid
].count(peer
) == 0) {
2066 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2070 PushInfo
*pi
= &pushing
[soid
][peer
];
2071 bool error
= pushing
[soid
].begin()->second
.recovery_progress
.error
;
2073 if (!pi
->recovery_progress
.data_complete
&& !error
) {
2074 dout(10) << " pushing more from, "
2075 << pi
->recovery_progress
.data_recovered_to
2076 << " of " << pi
->recovery_info
.copy_subset
<< dendl
;
2077 ObjectRecoveryProgress new_progress
;
2078 int r
= build_push_op(
2080 pi
->recovery_progress
, &new_progress
, reply
,
2082 // Handle the case of a read error right after we wrote, which is
2083 // hopefully extremely rare.
2085 dout(5) << __func__
<< ": oid " << soid
<< " error " << r
<< dendl
;
2090 pi
->recovery_progress
= new_progress
;
2096 get_parent()->on_peer_recover( peer
, soid
, pi
->recovery_info
);
2098 get_parent()->release_locks(pi
->lock_manager
);
2099 object_stat_sum_t stat
= pi
->stat
;
2100 eversion_t v
= pi
->recovery_info
.version
;
2101 pushing
[soid
].erase(peer
);
2104 if (pushing
[soid
].empty()) {
2106 get_parent()->on_global_recover(soid
, stat
, false);
2108 get_parent()->on_primary_error(soid
, v
);
2109 pushing
.erase(soid
);
2111 // This looks weird, but we erased the current peer and need to remember
2112 // the error on any other one, while getting more acks.
2114 pushing
[soid
].begin()->second
.recovery_progress
.error
= true;
2115 dout(10) << "pushed " << soid
<< ", still waiting for push ack from "
2116 << pushing
[soid
].size() << " others" << dendl
;
2123 void ReplicatedBackend::handle_pull(pg_shard_t peer
, PullOp
&op
, PushOp
*reply
)
2125 const hobject_t
&soid
= op
.soid
;
2127 int r
= store
->stat(ch
, ghobject_t(soid
), &st
);
2129 get_parent()->clog_error() << get_info().pgid
<< " "
2130 << peer
<< " tried to pull " << soid
2131 << " but got " << cpp_strerror(-r
);
2132 prep_push_op_blank(soid
, reply
);
2134 ObjectRecoveryInfo
&recovery_info
= op
.recovery_info
;
2135 ObjectRecoveryProgress
&progress
= op
.recovery_progress
;
2136 if (progress
.first
&& recovery_info
.size
== ((uint64_t)-1)) {
2137 // Adjust size and copy_subset
2138 recovery_info
.size
= st
.st_size
;
2139 recovery_info
.copy_subset
.clear();
2141 recovery_info
.copy_subset
.insert(0, st
.st_size
);
2142 ceph_assert(recovery_info
.clone_subset
.empty());
2145 r
= build_push_op(recovery_info
, progress
, 0, reply
);
2147 prep_push_op_blank(soid
, reply
);
2152 * trim received data to remove what we don't want
2154 * @param copy_subset intervals we want
2155 * @param data_included intervals we got
2156 * @param data_recieved data we got
2157 * @param intervals_usable intervals we want to keep
2158 * @param data_usable matching data we want to keep
2160 void ReplicatedBackend::trim_pushed_data(
2161 const interval_set
<uint64_t> ©_subset
,
2162 const interval_set
<uint64_t> &intervals_received
,
2163 bufferlist data_received
,
2164 interval_set
<uint64_t> *intervals_usable
,
2165 bufferlist
*data_usable
)
2167 if (intervals_received
.subset_of(copy_subset
)) {
2168 *intervals_usable
= intervals_received
;
2169 *data_usable
= data_received
;
2173 intervals_usable
->intersection_of(copy_subset
,
2174 intervals_received
);
2177 for (interval_set
<uint64_t>::const_iterator p
= intervals_received
.begin();
2178 p
!= intervals_received
.end();
2180 interval_set
<uint64_t> x
;
2181 x
.insert(p
.get_start(), p
.get_len());
2182 x
.intersection_of(copy_subset
);
2183 for (interval_set
<uint64_t>::const_iterator q
= x
.begin();
2187 uint64_t data_off
= off
+ (q
.get_start() - p
.get_start());
2188 sub
.substr_of(data_received
, data_off
, q
.get_len());
2189 data_usable
->claim_append(sub
);
2195 void ReplicatedBackend::_failed_pull(pg_shard_t from
, const hobject_t
&soid
)
2197 dout(20) << __func__
<< ": " << soid
<< " from " << from
<< dendl
;
2198 list
<pg_shard_t
> fl
= { from
};
2199 auto it
= pulling
.find(soid
);
2200 assert(it
!= pulling
.end());
2201 get_parent()->failed_push(fl
, soid
, it
->second
.recovery_info
.version
);
2206 void ReplicatedBackend::clear_pull_from(
2207 map
<hobject_t
, PullInfo
>::iterator piter
)
2209 auto from
= piter
->second
.from
;
2210 pull_from_peer
[from
].erase(piter
->second
.soid
);
2211 if (pull_from_peer
[from
].empty())
2212 pull_from_peer
.erase(from
);
2215 void ReplicatedBackend::clear_pull(
2216 map
<hobject_t
, PullInfo
>::iterator piter
,
2217 bool clear_pull_from_peer
)
2219 if (clear_pull_from_peer
) {
2220 clear_pull_from(piter
);
2222 get_parent()->release_locks(piter
->second
.lock_manager
);
2223 pulling
.erase(piter
);
2226 int ReplicatedBackend::start_pushes(
2227 const hobject_t
&soid
,
2228 ObjectContextRef obc
,
2231 list
< map
<pg_shard_t
, pg_missing_t
>::const_iterator
> shards
;
2233 dout(20) << __func__
<< " soid " << soid
<< dendl
;
2235 ceph_assert(get_parent()->get_acting_recovery_backfill_shards().size() > 0);
2236 for (set
<pg_shard_t
>::iterator i
=
2237 get_parent()->get_acting_recovery_backfill_shards().begin();
2238 i
!= get_parent()->get_acting_recovery_backfill_shards().end();
2240 if (*i
== get_parent()->whoami_shard()) continue;
2241 pg_shard_t peer
= *i
;
2242 map
<pg_shard_t
, pg_missing_t
>::const_iterator j
=
2243 get_parent()->get_shard_missing().find(peer
);
2244 ceph_assert(j
!= get_parent()->get_shard_missing().end());
2245 if (j
->second
.is_missing(soid
)) {
2246 shards
.push_back(j
);
2250 // If more than 1 read will occur ignore possible request to not cache
2251 bool cache
= shards
.size() == 1 ? h
->cache_dont_need
: false;
2253 for (auto j
: shards
) {
2254 pg_shard_t peer
= j
->first
;
2255 h
->pushes
[peer
].push_back(PushOp());
2256 int r
= prep_push_to_replica(obc
, soid
, peer
,
2257 &(h
->pushes
[peer
].back()), cache
);
2259 // Back out all failed reads
2260 for (auto k
: shards
) {
2261 pg_shard_t p
= k
->first
;
2262 dout(10) << __func__
<< " clean up peer " << p
<< dendl
;
2263 h
->pushes
[p
].pop_back();
2264 if (p
== peer
) break;
2269 return shards
.size();