1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "common/errno.h"
15 #include "ReplicatedBackend.h"
16 #include "messages/MOSDOp.h"
17 #include "messages/MOSDSubOp.h"
18 #include "messages/MOSDRepOp.h"
19 #include "messages/MOSDSubOpReply.h"
20 #include "messages/MOSDRepOpReply.h"
21 #include "messages/MOSDPGPush.h"
22 #include "messages/MOSDPGPull.h"
23 #include "messages/MOSDPGPushReply.h"
24 #include "common/EventTrace.h"
26 #define dout_context cct
27 #define dout_subsys ceph_subsys_osd
28 #define DOUT_PREFIX_ARGS this
30 #define dout_prefix _prefix(_dout, this)
31 static ostream
& _prefix(std::ostream
*_dout
, ReplicatedBackend
*pgb
) {
32 return *_dout
<< pgb
->get_parent()->gen_dbg_prefix();
36 class PG_SendMessageOnConn
: public Context
{
37 PGBackend::Listener
*pg
;
42 PGBackend::Listener
*pg
,
44 ConnectionRef conn
) : pg(pg
), reply(reply
), conn(conn
) {}
45 void finish(int) override
{
46 pg
->send_message_osd_cluster(reply
, conn
.get());
50 class PG_RecoveryQueueAsync
: public Context
{
51 PGBackend::Listener
*pg
;
52 unique_ptr
<GenContext
<ThreadPool::TPHandle
&>> c
;
54 PG_RecoveryQueueAsync(
55 PGBackend::Listener
*pg
,
56 GenContext
<ThreadPool::TPHandle
&> *c
) : pg(pg
), c(c
) {}
57 void finish(int) override
{
58 pg
->schedule_recovery_work(c
.release());
63 struct ReplicatedBackend::C_OSD_RepModifyApply
: public Context
{
64 ReplicatedBackend
*pg
;
66 C_OSD_RepModifyApply(ReplicatedBackend
*pg
, RepModifyRef r
)
68 void finish(int r
) override
{
69 pg
->repop_applied(rm
);
73 struct ReplicatedBackend::C_OSD_RepModifyCommit
: public Context
{
74 ReplicatedBackend
*pg
;
76 C_OSD_RepModifyCommit(ReplicatedBackend
*pg
, RepModifyRef r
)
78 void finish(int r
) override
{
83 static void log_subop_stats(
85 OpRequestRef op
, int subop
)
87 utime_t now
= ceph_clock_now();
88 utime_t latency
= now
;
89 latency
-= op
->get_req()->get_recv_stamp();
92 logger
->inc(l_osd_sop
);
93 logger
->tinc(l_osd_sop_lat
, latency
);
96 if (subop
!= l_osd_sop_pull
) {
97 uint64_t inb
= op
->get_req()->get_data().length();
98 logger
->inc(l_osd_sop_inb
, inb
);
99 if (subop
== l_osd_sop_w
) {
100 logger
->inc(l_osd_sop_w_inb
, inb
);
101 logger
->tinc(l_osd_sop_w_lat
, latency
);
102 } else if (subop
== l_osd_sop_push
) {
103 logger
->inc(l_osd_sop_push_inb
, inb
);
104 logger
->tinc(l_osd_sop_push_lat
, latency
);
106 assert("no support subop" == 0);
108 logger
->tinc(l_osd_sop_pull_lat
, latency
);
112 ReplicatedBackend::ReplicatedBackend(
113 PGBackend::Listener
*pg
,
115 ObjectStore::CollectionHandle
&c
,
118 PGBackend(cct
, pg
, store
, coll
, c
) {}
120 void ReplicatedBackend::run_recovery_op(
121 PGBackend::RecoveryHandle
*_h
,
124 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
125 send_pushes(priority
, h
->pushes
);
126 send_pulls(priority
, h
->pulls
);
130 void ReplicatedBackend::recover_object(
131 const hobject_t
&hoid
,
133 ObjectContextRef head
,
134 ObjectContextRef obc
,
138 dout(10) << __func__
<< ": " << hoid
<< dendl
;
139 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
140 if (get_parent()->get_local_missing().is_missing(hoid
)) {
151 int started
= start_pushes(
159 void ReplicatedBackend::check_recovery_sources(const OSDMapRef
& osdmap
)
161 for(map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= pull_from_peer
.begin();
162 i
!= pull_from_peer
.end();
164 if (osdmap
->is_down(i
->first
.osd
)) {
165 dout(10) << "check_recovery_sources resetting pulls from osd." << i
->first
166 << ", osdmap has it marked down" << dendl
;
167 for (set
<hobject_t
>::iterator j
= i
->second
.begin();
168 j
!= i
->second
.end();
170 get_parent()->cancel_pull(*j
);
171 clear_pull(pulling
.find(*j
), false);
173 pull_from_peer
.erase(i
++);
180 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op
)
182 dout(10) << __func__
<< ": " << op
<< dendl
;
183 switch (op
->get_req()->get_type()) {
184 case MSG_OSD_PG_PULL
:
191 bool ReplicatedBackend::handle_message(
195 dout(10) << __func__
<< ": " << op
<< dendl
;
196 switch (op
->get_req()->get_type()) {
197 case MSG_OSD_PG_PUSH
:
201 case MSG_OSD_PG_PULL
:
205 case MSG_OSD_PG_PUSH_REPLY
:
209 case MSG_OSD_SUBOP
: {
210 const MOSDSubOp
*m
= static_cast<const MOSDSubOp
*>(op
->get_req());
211 if (m
->ops
.size() == 0) {
217 case MSG_OSD_REPOP
: {
222 case MSG_OSD_REPOPREPLY
: {
233 void ReplicatedBackend::clear_recovery_state()
235 // clear pushing/pulling maps
236 for (auto &&i
: pushing
) {
237 for (auto &&j
: i
.second
) {
238 get_parent()->release_locks(j
.second
.lock_manager
);
243 for (auto &&i
: pulling
) {
244 get_parent()->release_locks(i
.second
.lock_manager
);
247 pull_from_peer
.clear();
250 void ReplicatedBackend::on_change()
252 dout(10) << __func__
<< dendl
;
253 for (map
<ceph_tid_t
, InProgressOp
>::iterator i
= in_progress_ops
.begin();
254 i
!= in_progress_ops
.end();
255 in_progress_ops
.erase(i
++)) {
256 if (i
->second
.on_commit
)
257 delete i
->second
.on_commit
;
258 if (i
->second
.on_applied
)
259 delete i
->second
.on_applied
;
261 clear_recovery_state();
264 void ReplicatedBackend::on_flushed()
268 int ReplicatedBackend::objects_read_sync(
269 const hobject_t
&hoid
,
275 return store
->read(ch
, ghobject_t(hoid
), off
, len
, *bl
, op_flags
);
278 struct AsyncReadCallback
: public GenContext
<ThreadPool::TPHandle
&> {
281 AsyncReadCallback(int r
, Context
*c
) : r(r
), c(c
) {}
282 void finish(ThreadPool::TPHandle
&) override
{
286 ~AsyncReadCallback() override
{
290 void ReplicatedBackend::objects_read_async(
291 const hobject_t
&hoid
,
292 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
293 pair
<bufferlist
*, Context
*> > > &to_read
,
294 Context
*on_complete
,
297 // There is no fast read implementation for replication backend yet
301 for (list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
302 pair
<bufferlist
*, Context
*> > >::const_iterator i
=
304 i
!= to_read
.end() && r
>= 0;
306 int _r
= store
->read(ch
, ghobject_t(hoid
), i
->first
.get
<0>(),
307 i
->first
.get
<1>(), *(i
->second
.first
),
309 if (i
->second
.second
) {
310 get_parent()->schedule_recovery_work(
311 get_parent()->bless_gencontext(
312 new AsyncReadCallback(_r
, i
->second
.second
)));
317 get_parent()->schedule_recovery_work(
318 get_parent()->bless_gencontext(
319 new AsyncReadCallback(r
, on_complete
)));
322 class C_OSD_OnOpCommit
: public Context
{
323 ReplicatedBackend
*pg
;
324 ReplicatedBackend::InProgressOp
*op
;
326 C_OSD_OnOpCommit(ReplicatedBackend
*pg
, ReplicatedBackend::InProgressOp
*op
)
328 void finish(int) override
{
333 class C_OSD_OnOpApplied
: public Context
{
334 ReplicatedBackend
*pg
;
335 ReplicatedBackend::InProgressOp
*op
;
337 C_OSD_OnOpApplied(ReplicatedBackend
*pg
, ReplicatedBackend::InProgressOp
*op
)
339 void finish(int) override
{
344 void generate_transaction(
345 PGTransactionUPtr
&pgt
,
347 bool legacy_log_entries
,
348 vector
<pg_log_entry_t
> &log_entries
,
349 ObjectStore::Transaction
*t
,
350 set
<hobject_t
> *added
,
351 set
<hobject_t
> *removed
)
357 for (auto &&le
: log_entries
) {
358 le
.mark_unrollbackable();
359 auto oiter
= pgt
->op_map
.find(le
.soid
);
360 if (oiter
!= pgt
->op_map
.end() && oiter
->second
.updated_snaps
) {
361 bufferlist
bl(oiter
->second
.updated_snaps
->second
.size() * 8 + 8);
362 ::encode(oiter
->second
.updated_snaps
->second
, bl
);
364 le
.snaps
.reassign_to_mempool(mempool::mempool_osd_pglog
);
368 pgt
->safe_create_traverse(
369 [&](pair
<const hobject_t
, PGTransaction::ObjectOperation
> &obj_op
) {
370 const hobject_t
&oid
= obj_op
.first
;
371 const ghobject_t goid
=
372 ghobject_t(oid
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
);
373 const PGTransaction::ObjectOperation
&op
= obj_op
.second
;
376 if (op
.is_fresh_object()) {
378 } else if (op
.is_delete()) {
379 removed
->insert(oid
);
383 if (op
.delete_first
) {
384 t
->remove(coll
, goid
);
389 [&](const PGTransaction::ObjectOperation::Init::None
&) {
391 [&](const PGTransaction::ObjectOperation::Init::Create
&op
) {
392 t
->touch(coll
, goid
);
394 [&](const PGTransaction::ObjectOperation::Init::Clone
&op
) {
398 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
401 [&](const PGTransaction::ObjectOperation::Init::Rename
&op
) {
402 assert(op
.source
.is_temp());
403 t
->collection_move_rename(
406 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
412 t
->truncate(coll
, goid
, op
.truncate
->first
);
413 if (op
.truncate
->first
!= op
.truncate
->second
)
414 t
->truncate(coll
, goid
, op
.truncate
->second
);
417 if (!op
.attr_updates
.empty()) {
418 map
<string
, bufferlist
> attrs
;
419 for (auto &&p
: op
.attr_updates
) {
421 attrs
[p
.first
] = *(p
.second
);
423 t
->rmattr(coll
, goid
, p
.first
);
425 t
->setattrs(coll
, goid
, attrs
);
429 t
->omap_clear(coll
, goid
);
431 t
->omap_setheader(coll
, goid
, *(op
.omap_header
));
433 for (auto &&up
: op
.omap_updates
) {
434 using UpdateType
= PGTransaction::ObjectOperation::OmapUpdateType
;
436 case UpdateType::Remove
:
437 t
->omap_rmkeys(coll
, goid
, up
.second
);
439 case UpdateType::Insert
:
440 t
->omap_setkeys(coll
, goid
, up
.second
);
445 // updated_snaps doesn't matter since we marked unrollbackable
448 auto &hint
= *(op
.alloc_hint
);
452 hint
.expected_object_size
,
453 hint
.expected_write_size
,
457 for (auto &&extent
: op
.buffer_updates
) {
458 using BufferUpdate
= PGTransaction::ObjectOperation::BufferUpdate
;
461 [&](const BufferUpdate::Write
&op
) {
469 [&](const BufferUpdate::Zero
&op
) {
476 [&](const BufferUpdate::CloneRange
&op
) {
477 assert(op
.len
== extent
.get_len());
480 ghobject_t(op
.from
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
490 void ReplicatedBackend::submit_transaction(
491 const hobject_t
&soid
,
492 const object_stat_sum_t
&delta_stats
,
493 const eversion_t
&at_version
,
494 PGTransactionUPtr
&&_t
,
495 const eversion_t
&trim_to
,
496 const eversion_t
&roll_forward_to
,
497 const vector
<pg_log_entry_t
> &_log_entries
,
498 boost::optional
<pg_hit_set_history_t
> &hset_history
,
499 Context
*on_local_applied_sync
,
500 Context
*on_all_acked
,
501 Context
*on_all_commit
,
504 OpRequestRef orig_op
)
510 vector
<pg_log_entry_t
> log_entries(_log_entries
);
511 ObjectStore::Transaction op_t
;
512 PGTransactionUPtr
t(std::move(_t
));
513 set
<hobject_t
> added
, removed
;
514 generate_transaction(
517 (get_osdmap()->require_osd_release
< CEPH_RELEASE_KRAKEN
),
522 assert(added
.size() <= 1);
523 assert(removed
.size() <= 1);
525 assert(!in_progress_ops
.count(tid
));
526 InProgressOp
&op
= in_progress_ops
.insert(
530 tid
, on_all_commit
, on_all_acked
,
535 op
.waiting_for_applied
.insert(
536 parent
->get_actingbackfill_shards().begin(),
537 parent
->get_actingbackfill_shards().end());
538 op
.waiting_for_commit
.insert(
539 parent
->get_actingbackfill_shards().begin(),
540 parent
->get_actingbackfill_shards().end());
549 added
.size() ? *(added
.begin()) : hobject_t(),
550 removed
.size() ? *(removed
.begin()) : hobject_t(),
556 add_temp_objs(added
);
557 clear_temp_objs(removed
);
559 parent
->log_operation(
567 op_t
.register_on_applied_sync(on_local_applied_sync
);
568 op_t
.register_on_applied(
569 parent
->bless_context(
570 new C_OSD_OnOpApplied(this, &op
)));
571 op_t
.register_on_commit(
572 parent
->bless_context(
573 new C_OSD_OnOpCommit(this, &op
)));
575 vector
<ObjectStore::Transaction
> tls
;
576 tls
.push_back(std::move(op_t
));
578 parent
->queue_transactions(tls
, op
.op
);
581 void ReplicatedBackend::op_applied(
585 OID_EVENT_TRACE_WITH_MSG((op
&& op
->op
) ? op
->op
->get_req() : NULL
, "OP_APPLIED_BEGIN", true);
586 dout(10) << __func__
<< ": " << op
->tid
<< dendl
;
588 op
->op
->mark_event("op_applied");
589 op
->op
->pg_trace
.event("op applied");
592 op
->waiting_for_applied
.erase(get_parent()->whoami_shard());
593 parent
->op_applied(op
->v
);
595 if (op
->waiting_for_applied
.empty()) {
596 op
->on_applied
->complete(0);
600 assert(!op
->on_commit
&& !op
->on_applied
);
601 in_progress_ops
.erase(op
->tid
);
605 void ReplicatedBackend::op_commit(
609 OID_EVENT_TRACE_WITH_MSG((op
&& op
->op
) ? op
->op
->get_req() : NULL
, "OP_COMMIT_BEGIN", true);
610 dout(10) << __func__
<< ": " << op
->tid
<< dendl
;
612 op
->op
->mark_event("op_commit");
613 op
->op
->pg_trace
.event("op commit");
616 op
->waiting_for_commit
.erase(get_parent()->whoami_shard());
618 if (op
->waiting_for_commit
.empty()) {
619 op
->on_commit
->complete(0);
623 assert(!op
->on_commit
&& !op
->on_applied
);
624 in_progress_ops
.erase(op
->tid
);
628 void ReplicatedBackend::do_repop_reply(OpRequestRef op
)
630 static_cast<MOSDRepOpReply
*>(op
->get_nonconst_req())->finish_decode();
631 const MOSDRepOpReply
*r
= static_cast<const MOSDRepOpReply
*>(op
->get_req());
632 assert(r
->get_header().type
== MSG_OSD_REPOPREPLY
);
636 // must be replication.
637 ceph_tid_t rep_tid
= r
->get_tid();
638 pg_shard_t from
= r
->from
;
640 if (in_progress_ops
.count(rep_tid
)) {
641 map
<ceph_tid_t
, InProgressOp
>::iterator iter
=
642 in_progress_ops
.find(rep_tid
);
643 InProgressOp
&ip_op
= iter
->second
;
644 const MOSDOp
*m
= NULL
;
646 m
= static_cast<const MOSDOp
*>(ip_op
.op
->get_req());
649 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " op " //<< *m
650 << " ack_type " << (int)r
->ack_type
654 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " (no op) "
655 << " ack_type " << (int)r
->ack_type
661 if (r
->ack_type
& CEPH_OSD_FLAG_ONDISK
) {
662 assert(ip_op
.waiting_for_commit
.count(from
));
663 ip_op
.waiting_for_commit
.erase(from
);
666 ss
<< "sub_op_commit_rec from " << from
;
667 ip_op
.op
->mark_event_string(ss
.str());
668 ip_op
.op
->pg_trace
.event("sub_op_commit_rec");
671 assert(ip_op
.waiting_for_applied
.count(from
));
674 ss
<< "sub_op_applied_rec from " << from
;
675 ip_op
.op
->mark_event_string(ss
.str());
676 ip_op
.op
->pg_trace
.event("sub_op_applied_rec");
679 ip_op
.waiting_for_applied
.erase(from
);
681 parent
->update_peer_last_complete_ondisk(
683 r
->get_last_complete_ondisk());
685 if (ip_op
.waiting_for_applied
.empty() &&
687 ip_op
.on_applied
->complete(0);
688 ip_op
.on_applied
= 0;
690 if (ip_op
.waiting_for_commit
.empty() &&
692 ip_op
.on_commit
->complete(0);
696 assert(!ip_op
.on_commit
&& !ip_op
.on_applied
);
697 in_progress_ops
.erase(iter
);
702 void ReplicatedBackend::be_deep_scrub(
703 const hobject_t
&poid
,
706 ThreadPool::TPHandle
&handle
)
708 dout(10) << __func__
<< " " << poid
<< " seed "
709 << std::hex
<< seed
<< std::dec
<< dendl
;
710 bufferhash
h(seed
), oh(seed
);
711 bufferlist bl
, hdrbl
;
715 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
| CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
718 handle
.reset_tp_timeout();
722 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
724 cct
->_conf
->osd_deep_scrub_stride
, bl
,
725 fadvise_flags
, true);
734 dout(25) << __func__
<< " " << poid
<< " got "
735 << r
<< " on read, read_error" << dendl
;
739 o
.digest
= h
.digest();
740 o
.digest_present
= true;
743 r
= store
->omap_get_header(
746 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
748 // NOTE: bobtail to giant, we would crc the head as (len, head).
749 // that changes at the same time we start using a non-zero seed.
750 if (r
== 0 && hdrbl
.length()) {
751 dout(25) << "CRC header " << string(hdrbl
.c_str(), hdrbl
.length())
761 } else if (r
== -EIO
) {
762 dout(25) << __func__
<< " " << poid
<< " got "
763 << r
<< " on omap header read, read_error" << dendl
;
768 ObjectMap::ObjectMapIterator iter
= store
->get_omap_iterator(
771 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
773 uint64_t keys_scanned
= 0;
774 for (iter
->seek_to_first(); iter
->status() == 0 && iter
->valid();
776 if (cct
->_conf
->osd_scan_list_ping_tp_interval
&&
777 (keys_scanned
% cct
->_conf
->osd_scan_list_ping_tp_interval
== 0)) {
778 handle
.reset_tp_timeout();
782 dout(25) << "CRC key " << iter
->key() << " value:\n";
783 iter
->value().hexdump(*_dout
);
786 ::encode(iter
->key(), bl
);
787 ::encode(iter
->value(), bl
);
792 if (iter
->status() < 0) {
793 dout(25) << __func__
<< " " << poid
794 << " on omap scan, db status error" << dendl
;
799 //Store final calculated CRC32 of omap header & key/values
800 o
.omap_digest
= oh
.digest();
801 o
.omap_digest_present
= true;
802 dout(20) << __func__
<< " " << poid
<< " omap_digest "
803 << std::hex
<< o
.omap_digest
<< std::dec
<< dendl
;
806 void ReplicatedBackend::_do_push(OpRequestRef op
)
808 const MOSDPGPush
*m
= static_cast<const MOSDPGPush
*>(op
->get_req());
809 assert(m
->get_type() == MSG_OSD_PG_PUSH
);
810 pg_shard_t from
= m
->from
;
814 vector
<PushReplyOp
> replies
;
815 ObjectStore::Transaction t
;
817 if (get_parent()->check_failsafe_full(ss
)) {
818 dout(10) << __func__
<< " Out of space (failsafe) processing push request: " << ss
.str() << dendl
;
821 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
822 i
!= m
->pushes
.end();
824 replies
.push_back(PushReplyOp());
825 handle_push(from
, *i
, &(replies
.back()), &t
);
828 MOSDPGPushReply
*reply
= new MOSDPGPushReply
;
829 reply
->from
= get_parent()->whoami_shard();
830 reply
->set_priority(m
->get_priority());
831 reply
->pgid
= get_info().pgid
;
832 reply
->map_epoch
= m
->map_epoch
;
833 reply
->min_epoch
= m
->min_epoch
;
834 reply
->replies
.swap(replies
);
835 reply
->compute_cost(cct
);
837 t
.register_on_complete(
838 new PG_SendMessageOnConn(
839 get_parent(), reply
, m
->get_connection()));
841 get_parent()->queue_transaction(std::move(t
));
844 struct C_ReplicatedBackend_OnPullComplete
: GenContext
<ThreadPool::TPHandle
&> {
845 ReplicatedBackend
*bc
;
846 list
<ReplicatedBackend::pull_complete_info
> to_continue
;
848 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend
*bc
, int priority
)
849 : bc(bc
), priority(priority
) {}
851 void finish(ThreadPool::TPHandle
&handle
) override
{
852 ReplicatedBackend::RPGHandle
*h
= bc
->_open_recovery_op();
853 for (auto &&i
: to_continue
) {
854 auto j
= bc
->pulling
.find(i
.hoid
);
855 assert(j
!= bc
->pulling
.end());
856 ObjectContextRef obc
= j
->second
.obc
;
857 bc
->clear_pull(j
, false /* already did it */);
858 if (!bc
->start_pushes(i
.hoid
, obc
, h
)) {
859 bc
->get_parent()->on_global_recover(
862 handle
.reset_tp_timeout();
864 bc
->run_recovery_op(h
, priority
);
868 void ReplicatedBackend::_do_pull_response(OpRequestRef op
)
870 const MOSDPGPush
*m
= static_cast<const MOSDPGPush
*>(op
->get_req());
871 assert(m
->get_type() == MSG_OSD_PG_PUSH
);
872 pg_shard_t from
= m
->from
;
876 vector
<PullOp
> replies(1);
879 if (get_parent()->check_failsafe_full(ss
)) {
880 dout(10) << __func__
<< " Out of space (failsafe) processing pull response (push): " << ss
.str() << dendl
;
884 ObjectStore::Transaction t
;
885 list
<pull_complete_info
> to_continue
;
886 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
887 i
!= m
->pushes
.end();
889 bool more
= handle_pull_response(from
, *i
, &(replies
.back()), &to_continue
, &t
);
891 replies
.push_back(PullOp());
893 if (!to_continue
.empty()) {
894 C_ReplicatedBackend_OnPullComplete
*c
=
895 new C_ReplicatedBackend_OnPullComplete(
898 c
->to_continue
.swap(to_continue
);
899 t
.register_on_complete(
900 new PG_RecoveryQueueAsync(
902 get_parent()->bless_gencontext(c
)));
904 replies
.erase(replies
.end() - 1);
906 if (replies
.size()) {
907 MOSDPGPull
*reply
= new MOSDPGPull
;
908 reply
->from
= parent
->whoami_shard();
909 reply
->set_priority(m
->get_priority());
910 reply
->pgid
= get_info().pgid
;
911 reply
->map_epoch
= m
->map_epoch
;
912 reply
->min_epoch
= m
->min_epoch
;
913 reply
->set_pulls(&replies
);
914 reply
->compute_cost(cct
);
916 t
.register_on_complete(
917 new PG_SendMessageOnConn(
918 get_parent(), reply
, m
->get_connection()));
921 get_parent()->queue_transaction(std::move(t
));
924 void ReplicatedBackend::do_pull(OpRequestRef op
)
926 MOSDPGPull
*m
= static_cast<MOSDPGPull
*>(op
->get_nonconst_req());
927 assert(m
->get_type() == MSG_OSD_PG_PULL
);
928 pg_shard_t from
= m
->from
;
930 map
<pg_shard_t
, vector
<PushOp
> > replies
;
931 vector
<PullOp
> pulls
;
932 m
->take_pulls(&pulls
);
933 for (auto& i
: pulls
) {
934 replies
[from
].push_back(PushOp());
935 handle_pull(from
, i
, &(replies
[from
].back()));
937 send_pushes(m
->get_priority(), replies
);
940 void ReplicatedBackend::do_push_reply(OpRequestRef op
)
942 const MOSDPGPushReply
*m
= static_cast<const MOSDPGPushReply
*>(op
->get_req());
943 assert(m
->get_type() == MSG_OSD_PG_PUSH_REPLY
);
944 pg_shard_t from
= m
->from
;
946 vector
<PushOp
> replies(1);
947 for (vector
<PushReplyOp
>::const_iterator i
= m
->replies
.begin();
948 i
!= m
->replies
.end();
950 bool more
= handle_push_reply(from
, *i
, &(replies
.back()));
952 replies
.push_back(PushOp());
954 replies
.erase(replies
.end() - 1);
956 map
<pg_shard_t
, vector
<PushOp
> > _replies
;
957 _replies
[from
].swap(replies
);
958 send_pushes(m
->get_priority(), _replies
);
961 Message
* ReplicatedBackend::generate_subop(
962 const hobject_t
&soid
,
963 const eversion_t
&at_version
,
966 eversion_t pg_trim_to
,
967 eversion_t pg_roll_forward_to
,
968 hobject_t new_temp_oid
,
969 hobject_t discard_temp_oid
,
970 const vector
<pg_log_entry_t
> &log_entries
,
971 boost::optional
<pg_hit_set_history_t
> &hset_hist
,
972 ObjectStore::Transaction
&op_t
,
974 const pg_info_t
&pinfo
)
976 int acks_wanted
= CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
;
977 // forward the write/update/whatever
978 MOSDRepOp
*wr
= new MOSDRepOp(
979 reqid
, parent
->whoami_shard(),
980 spg_t(get_info().pgid
.pgid
, peer
.shard
),
982 get_osdmap()->get_epoch(),
983 parent
->get_last_peering_reset_epoch(),
986 // ship resulting transaction, log entries, and pg_stats
987 if (!parent
->should_send_op(peer
, soid
)) {
988 dout(10) << "issue_repop shipping empty opt to osd." << peer
989 <<", object " << soid
990 << " beyond MAX(last_backfill_started "
991 << ", pinfo.last_backfill "
992 << pinfo
.last_backfill
<< ")" << dendl
;
993 ObjectStore::Transaction t
;
994 ::encode(t
, wr
->get_data());
996 ::encode(op_t
, wr
->get_data());
997 wr
->get_header().data_off
= op_t
.get_data_alignment();
1000 ::encode(log_entries
, wr
->logbl
);
1002 if (pinfo
.is_incomplete())
1003 wr
->pg_stats
= pinfo
.stats
; // reflects backfill progress
1005 wr
->pg_stats
= get_info().stats
;
1007 wr
->pg_trim_to
= pg_trim_to
;
1008 wr
->pg_roll_forward_to
= pg_roll_forward_to
;
1010 wr
->new_temp_oid
= new_temp_oid
;
1011 wr
->discard_temp_oid
= discard_temp_oid
;
1012 wr
->updated_hit_set_history
= hset_hist
;
1016 void ReplicatedBackend::issue_op(
1017 const hobject_t
&soid
,
1018 const eversion_t
&at_version
,
1021 eversion_t pg_trim_to
,
1022 eversion_t pg_roll_forward_to
,
1023 hobject_t new_temp_oid
,
1024 hobject_t discard_temp_oid
,
1025 const vector
<pg_log_entry_t
> &log_entries
,
1026 boost::optional
<pg_hit_set_history_t
> &hset_hist
,
1028 ObjectStore::Transaction
&op_t
)
1031 op
->op
->pg_trace
.event("issue replication ops");
1033 if (parent
->get_actingbackfill_shards().size() > 1) {
1035 set
<pg_shard_t
> replicas
= parent
->get_actingbackfill_shards();
1036 replicas
.erase(parent
->whoami_shard());
1037 ss
<< "waiting for subops from " << replicas
;
1039 op
->op
->mark_sub_op_sent(ss
.str());
1041 for (set
<pg_shard_t
>::const_iterator i
=
1042 parent
->get_actingbackfill_shards().begin();
1043 i
!= parent
->get_actingbackfill_shards().end();
1045 if (*i
== parent
->whoami_shard()) continue;
1046 pg_shard_t peer
= *i
;
1047 const pg_info_t
&pinfo
= parent
->get_shard_info().find(peer
)->second
;
1050 wr
= generate_subop(
1065 wr
->trace
.init("replicated op", nullptr, &op
->op
->pg_trace
);
1066 get_parent()->send_message_osd_cluster(
1067 peer
.osd
, wr
, get_osdmap()->get_epoch());
1072 void ReplicatedBackend::do_repop(OpRequestRef op
)
1074 static_cast<MOSDRepOp
*>(op
->get_nonconst_req())->finish_decode();
1075 const MOSDRepOp
*m
= static_cast<const MOSDRepOp
*>(op
->get_req());
1076 int msg_type
= m
->get_type();
1077 assert(MSG_OSD_REPOP
== msg_type
);
1079 const hobject_t
& soid
= m
->poid
;
1081 dout(10) << __func__
<< " " << soid
1082 << " v " << m
->version
1083 << (m
->logbl
.length() ? " (transaction)" : " (parallel exec")
1084 << " " << m
->logbl
.length()
1088 assert(m
->map_epoch
>= get_info().history
.same_interval_since
);
1090 // we better not be missing this.
1091 assert(!parent
->get_log().get_missing().is_missing(soid
));
1093 int ackerosd
= m
->get_source().num();
1097 RepModifyRef
rm(std::make_shared
<RepModify
>());
1099 rm
->ackerosd
= ackerosd
;
1100 rm
->last_complete
= get_info().last_complete
;
1101 rm
->epoch_started
= get_osdmap()->get_epoch();
1103 assert(m
->logbl
.length());
1104 // shipped transaction and log entries
1105 vector
<pg_log_entry_t
> log
;
1107 bufferlist::iterator p
= const_cast<bufferlist
&>(m
->get_data()).begin();
1108 ::decode(rm
->opt
, p
);
1110 if (m
->new_temp_oid
!= hobject_t()) {
1111 dout(20) << __func__
<< " start tracking temp " << m
->new_temp_oid
<< dendl
;
1112 add_temp_obj(m
->new_temp_oid
);
1114 if (m
->discard_temp_oid
!= hobject_t()) {
1115 dout(20) << __func__
<< " stop tracking temp " << m
->discard_temp_oid
<< dendl
;
1116 if (rm
->opt
.empty()) {
1117 dout(10) << __func__
<< ": removing object " << m
->discard_temp_oid
1118 << " since we won't get the transaction" << dendl
;
1119 rm
->localt
.remove(coll
, ghobject_t(m
->discard_temp_oid
));
1121 clear_temp_obj(m
->discard_temp_oid
);
1124 p
= const_cast<bufferlist
&>(m
->logbl
).begin();
1126 rm
->opt
.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
1128 bool update_snaps
= false;
1129 if (!rm
->opt
.empty()) {
1130 // If the opt is non-empty, we infer we are before
1131 // last_backfill (according to the primary, not our
1132 // not-quite-accurate value), and should update the
1133 // collections now. Otherwise, we do it later on push.
1134 update_snaps
= true;
1136 parent
->update_stats(m
->pg_stats
);
1137 parent
->log_operation(
1139 m
->updated_hit_set_history
,
1141 m
->pg_roll_forward_to
,
1145 rm
->opt
.register_on_commit(
1146 parent
->bless_context(
1147 new C_OSD_RepModifyCommit(this, rm
)));
1148 rm
->localt
.register_on_applied(
1149 parent
->bless_context(
1150 new C_OSD_RepModifyApply(this, rm
)));
1151 vector
<ObjectStore::Transaction
> tls
;
1153 tls
.push_back(std::move(rm
->localt
));
1154 tls
.push_back(std::move(rm
->opt
));
1155 parent
->queue_transactions(tls
, op
);
1156 // op is cleaned up by oncommit/onapply when both are executed
1159 void ReplicatedBackend::repop_applied(RepModifyRef rm
)
1161 rm
->op
->mark_event("sub_op_applied");
1163 rm
->op
->pg_trace
.event("sup_op_applied");
1165 dout(10) << __func__
<< " on " << rm
<< " op "
1166 << *rm
->op
->get_req() << dendl
;
1167 const Message
*m
= rm
->op
->get_req();
1168 const MOSDRepOp
*req
= static_cast<const MOSDRepOp
*>(m
);
1169 eversion_t version
= req
->version
;
1171 // send ack to acker only if we haven't sent a commit already
1172 if (!rm
->committed
) {
1173 Message
*ack
= new MOSDRepOpReply(
1174 req
, parent
->whoami_shard(),
1175 0, get_osdmap()->get_epoch(), req
->min_epoch
, CEPH_OSD_FLAG_ACK
);
1176 ack
->set_priority(CEPH_MSG_PRIO_HIGH
); // this better match commit priority!
1177 ack
->trace
= rm
->op
->pg_trace
;
1178 get_parent()->send_message_osd_cluster(
1179 rm
->ackerosd
, ack
, get_osdmap()->get_epoch());
1182 parent
->op_applied(version
);
1185 void ReplicatedBackend::repop_commit(RepModifyRef rm
)
1187 rm
->op
->mark_commit_sent();
1188 rm
->op
->pg_trace
.event("sup_op_commit");
1189 rm
->committed
= true;
1192 const MOSDRepOp
*m
= static_cast<const MOSDRepOp
*>(rm
->op
->get_req());
1193 assert(m
->get_type() == MSG_OSD_REPOP
);
1194 dout(10) << __func__
<< " on op " << *m
1195 << ", sending commit to osd." << rm
->ackerosd
1197 assert(get_osdmap()->is_up(rm
->ackerosd
));
1199 get_parent()->update_last_complete_ondisk(rm
->last_complete
);
1201 MOSDRepOpReply
*reply
= new MOSDRepOpReply(
1203 get_parent()->whoami_shard(),
1204 0, get_osdmap()->get_epoch(), m
->get_min_epoch(), CEPH_OSD_FLAG_ONDISK
);
1205 reply
->set_last_complete_ondisk(rm
->last_complete
);
1206 reply
->set_priority(CEPH_MSG_PRIO_HIGH
); // this better match ack priority!
1207 reply
->trace
= rm
->op
->pg_trace
;
1208 get_parent()->send_message_osd_cluster(
1209 rm
->ackerosd
, reply
, get_osdmap()->get_epoch());
1211 log_subop_stats(get_parent()->get_logger(), rm
->op
, l_osd_sop_w
);
1215 // ===========================================================
1217 void ReplicatedBackend::calc_head_subsets(
1218 ObjectContextRef obc
, SnapSet
& snapset
, const hobject_t
& head
,
1219 const pg_missing_t
& missing
,
1220 const hobject_t
&last_backfill
,
1221 interval_set
<uint64_t>& data_subset
,
1222 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1223 ObcLockManager
&manager
)
1225 dout(10) << "calc_head_subsets " << head
1226 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1228 uint64_t size
= obc
->obs
.oi
.size
;
1230 data_subset
.insert(0, size
);
1232 if (get_parent()->get_pool().allow_incomplete_clones()) {
1233 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1237 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1238 dout(10) << "calc_head_subsets " << head
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1243 interval_set
<uint64_t> cloning
;
1244 interval_set
<uint64_t> prev
;
1246 prev
.insert(0, size
);
1248 for (int j
=snapset
.clones
.size()-1; j
>=0; j
--) {
1250 c
.snap
= snapset
.clones
[j
];
1251 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1252 if (!missing
.is_missing(c
) &&
1253 c
< last_backfill
&&
1254 get_parent()->try_lock_for_read(c
, manager
)) {
1255 dout(10) << "calc_head_subsets " << head
<< " has prev " << c
1256 << " overlap " << prev
<< dendl
;
1257 clone_subsets
[c
] = prev
;
1258 cloning
.union_of(prev
);
1261 dout(10) << "calc_head_subsets " << head
<< " does not have prev " << c
1262 << " overlap " << prev
<< dendl
;
1266 if (cloning
.num_intervals() > cct
->_conf
->osd_recover_clone_overlap_limit
) {
1267 dout(10) << "skipping clone, too many holes" << dendl
;
1268 get_parent()->release_locks(manager
);
1269 clone_subsets
.clear();
1273 // what's left for us to push?
1274 data_subset
.subtract(cloning
);
1276 dout(10) << "calc_head_subsets " << head
1277 << " data_subset " << data_subset
1278 << " clone_subsets " << clone_subsets
<< dendl
;
1281 void ReplicatedBackend::calc_clone_subsets(
1282 SnapSet
& snapset
, const hobject_t
& soid
,
1283 const pg_missing_t
& missing
,
1284 const hobject_t
&last_backfill
,
1285 interval_set
<uint64_t>& data_subset
,
1286 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1287 ObcLockManager
&manager
)
1289 dout(10) << "calc_clone_subsets " << soid
1290 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1292 uint64_t size
= snapset
.clone_size
[soid
.snap
];
1294 data_subset
.insert(0, size
);
1296 if (get_parent()->get_pool().allow_incomplete_clones()) {
1297 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1301 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1302 dout(10) << "calc_clone_subsets " << soid
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1307 for (i
=0; i
< snapset
.clones
.size(); i
++)
1308 if (snapset
.clones
[i
] == soid
.snap
)
1311 // any overlap with next older clone?
1312 interval_set
<uint64_t> cloning
;
1313 interval_set
<uint64_t> prev
;
1315 prev
.insert(0, size
);
1316 for (int j
=i
-1; j
>=0; j
--) {
1318 c
.snap
= snapset
.clones
[j
];
1319 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1320 if (!missing
.is_missing(c
) &&
1321 c
< last_backfill
&&
1322 get_parent()->try_lock_for_read(c
, manager
)) {
1323 dout(10) << "calc_clone_subsets " << soid
<< " has prev " << c
1324 << " overlap " << prev
<< dendl
;
1325 clone_subsets
[c
] = prev
;
1326 cloning
.union_of(prev
);
1329 dout(10) << "calc_clone_subsets " << soid
<< " does not have prev " << c
1330 << " overlap " << prev
<< dendl
;
1333 // overlap with next newest?
1334 interval_set
<uint64_t> next
;
1336 next
.insert(0, size
);
1337 for (unsigned j
=i
+1; j
<snapset
.clones
.size(); j
++) {
1339 c
.snap
= snapset
.clones
[j
];
1340 next
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
-1]]);
1341 if (!missing
.is_missing(c
) &&
1342 c
< last_backfill
&&
1343 get_parent()->try_lock_for_read(c
, manager
)) {
1344 dout(10) << "calc_clone_subsets " << soid
<< " has next " << c
1345 << " overlap " << next
<< dendl
;
1346 clone_subsets
[c
] = next
;
1347 cloning
.union_of(next
);
1350 dout(10) << "calc_clone_subsets " << soid
<< " does not have next " << c
1351 << " overlap " << next
<< dendl
;
1354 if (cloning
.num_intervals() > cct
->_conf
->osd_recover_clone_overlap_limit
) {
1355 dout(10) << "skipping clone, too many holes" << dendl
;
1356 get_parent()->release_locks(manager
);
1357 clone_subsets
.clear();
1362 // what's left for us to push?
1363 data_subset
.subtract(cloning
);
1365 dout(10) << "calc_clone_subsets " << soid
1366 << " data_subset " << data_subset
1367 << " clone_subsets " << clone_subsets
<< dendl
;
1370 void ReplicatedBackend::prepare_pull(
1372 const hobject_t
& soid
,
1373 ObjectContextRef headctx
,
1376 assert(get_parent()->get_local_missing().get_items().count(soid
));
1377 eversion_t _v
= get_parent()->get_local_missing().get_items().find(
1380 const map
<hobject_t
, set
<pg_shard_t
>> &missing_loc(
1381 get_parent()->get_missing_loc_shards());
1382 const map
<pg_shard_t
, pg_missing_t
> &peer_missing(
1383 get_parent()->get_shard_missing());
1384 map
<hobject_t
, set
<pg_shard_t
>>::const_iterator q
= missing_loc
.find(soid
);
1385 assert(q
!= missing_loc
.end());
1386 assert(!q
->second
.empty());
1389 vector
<pg_shard_t
> shuffle(q
->second
.begin(), q
->second
.end());
1390 random_shuffle(shuffle
.begin(), shuffle
.end());
1391 vector
<pg_shard_t
>::iterator p
= shuffle
.begin();
1392 assert(get_osdmap()->is_up(p
->osd
));
1393 pg_shard_t fromshard
= *p
;
1395 dout(7) << "pull " << soid
1397 << " on osds " << q
->second
1398 << " from osd." << fromshard
1401 assert(peer_missing
.count(fromshard
));
1402 const pg_missing_t
&pmissing
= peer_missing
.find(fromshard
)->second
;
1403 if (pmissing
.is_missing(soid
, v
)) {
1404 assert(pmissing
.get_items().find(soid
)->second
.have
!= v
);
1405 dout(10) << "pulling soid " << soid
<< " from osd " << fromshard
1406 << " at version " << pmissing
.get_items().find(soid
)->second
.have
1407 << " rather than at version " << v
<< dendl
;
1408 v
= pmissing
.get_items().find(soid
)->second
.have
;
1409 assert(get_parent()->get_log().get_log().objects
.count(soid
) &&
1410 (get_parent()->get_log().get_log().objects
.find(soid
)->second
->op
==
1411 pg_log_entry_t::LOST_REVERT
) &&
1412 (get_parent()->get_log().get_log().objects
.find(
1413 soid
)->second
->reverting_to
==
1417 ObjectRecoveryInfo recovery_info
;
1418 ObcLockManager lock_manager
;
1420 if (soid
.is_snap()) {
1421 assert(!get_parent()->get_local_missing().is_missing(
1423 !get_parent()->get_local_missing().is_missing(
1424 soid
.get_snapdir()));
1427 SnapSetContext
*ssc
= headctx
->ssc
;
1429 dout(10) << " snapset " << ssc
->snapset
<< dendl
;
1430 recovery_info
.ss
= ssc
->snapset
;
1432 ssc
->snapset
, soid
, get_parent()->get_local_missing(),
1433 get_info().last_backfill
,
1434 recovery_info
.copy_subset
,
1435 recovery_info
.clone_subset
,
1437 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1438 dout(10) << " pulling " << recovery_info
<< dendl
;
1440 assert(ssc
->snapset
.clone_size
.count(soid
.snap
));
1441 recovery_info
.size
= ssc
->snapset
.clone_size
[soid
.snap
];
1443 // pulling head or unversioned object.
1444 // always pull the whole thing.
1445 recovery_info
.copy_subset
.insert(0, (uint64_t)-1);
1446 recovery_info
.size
= ((uint64_t)-1);
1449 h
->pulls
[fromshard
].push_back(PullOp());
1450 PullOp
&op
= h
->pulls
[fromshard
].back();
1453 op
.recovery_info
= recovery_info
;
1454 op
.recovery_info
.soid
= soid
;
1455 op
.recovery_info
.version
= v
;
1456 op
.recovery_progress
.data_complete
= false;
1457 op
.recovery_progress
.omap_complete
= false;
1458 op
.recovery_progress
.data_recovered_to
= 0;
1459 op
.recovery_progress
.first
= true;
1461 assert(!pulling
.count(soid
));
1462 pull_from_peer
[fromshard
].insert(soid
);
1463 PullInfo
&pi
= pulling
[soid
];
1464 pi
.from
= fromshard
;
1466 pi
.head_ctx
= headctx
;
1467 pi
.recovery_info
= op
.recovery_info
;
1468 pi
.recovery_progress
= op
.recovery_progress
;
1469 pi
.cache_dont_need
= h
->cache_dont_need
;
1470 pi
.lock_manager
= std::move(lock_manager
);
1474 * intelligently push an object to a replica. make use of existing
1475 * clones/heads and dup data ranges where possible.
1477 void ReplicatedBackend::prep_push_to_replica(
1478 ObjectContextRef obc
, const hobject_t
& soid
, pg_shard_t peer
,
1479 PushOp
*pop
, bool cache_dont_need
)
1481 const object_info_t
& oi
= obc
->obs
.oi
;
1482 uint64_t size
= obc
->obs
.oi
.size
;
1484 dout(10) << __func__
<< ": " << soid
<< " v" << oi
.version
1485 << " size " << size
<< " to osd." << peer
<< dendl
;
1487 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1488 interval_set
<uint64_t> data_subset
;
1490 ObcLockManager lock_manager
;
1491 // are we doing a clone on the replica?
1492 if (soid
.snap
&& soid
.snap
< CEPH_NOSNAP
) {
1493 hobject_t head
= soid
;
1494 head
.snap
= CEPH_NOSNAP
;
1496 // try to base push off of clones that succeed/preceed poid
1497 // we need the head (and current SnapSet) locally to do that.
1498 if (get_parent()->get_local_missing().is_missing(head
)) {
1499 dout(15) << "push_to_replica missing head " << head
<< ", pushing raw clone" << dendl
;
1500 return prep_push(obc
, soid
, peer
, pop
, cache_dont_need
);
1502 hobject_t snapdir
= head
;
1503 snapdir
.snap
= CEPH_SNAPDIR
;
1504 if (get_parent()->get_local_missing().is_missing(snapdir
)) {
1505 dout(15) << "push_to_replica missing snapdir " << snapdir
1506 << ", pushing raw clone" << dendl
;
1507 return prep_push(obc
, soid
, peer
, pop
, cache_dont_need
);
1510 SnapSetContext
*ssc
= obc
->ssc
;
1512 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1513 pop
->recovery_info
.ss
= ssc
->snapset
;
1514 map
<pg_shard_t
, pg_missing_t
>::const_iterator pm
=
1515 get_parent()->get_shard_missing().find(peer
);
1516 assert(pm
!= get_parent()->get_shard_missing().end());
1517 map
<pg_shard_t
, pg_info_t
>::const_iterator pi
=
1518 get_parent()->get_shard_info().find(peer
);
1519 assert(pi
!= get_parent()->get_shard_info().end());
1523 pi
->second
.last_backfill
,
1524 data_subset
, clone_subsets
,
1526 } else if (soid
.snap
== CEPH_NOSNAP
) {
1527 // pushing head or unversioned object.
1528 // base this on partially on replica's clones?
1529 SnapSetContext
*ssc
= obc
->ssc
;
1531 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1534 ssc
->snapset
, soid
, get_parent()->get_shard_missing().find(peer
)->second
,
1535 get_parent()->get_shard_info().find(peer
)->second
.last_backfill
,
1536 data_subset
, clone_subsets
,
1549 std::move(lock_manager
));
1552 void ReplicatedBackend::prep_push(ObjectContextRef obc
,
1553 const hobject_t
& soid
, pg_shard_t peer
,
1554 PushOp
*pop
, bool cache_dont_need
)
1556 interval_set
<uint64_t> data_subset
;
1557 if (obc
->obs
.oi
.size
)
1558 data_subset
.insert(0, obc
->obs
.oi
.size
);
1559 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1561 prep_push(obc
, soid
, peer
,
1562 obc
->obs
.oi
.version
, data_subset
, clone_subsets
,
1563 pop
, cache_dont_need
, ObcLockManager());
1566 void ReplicatedBackend::prep_push(
1567 ObjectContextRef obc
,
1568 const hobject_t
& soid
, pg_shard_t peer
,
1570 interval_set
<uint64_t> &data_subset
,
1571 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1573 bool cache_dont_need
,
1574 ObcLockManager
&&lock_manager
)
1576 get_parent()->begin_peer_recover(peer
, soid
);
1578 PushInfo
&pi
= pushing
[soid
][peer
];
1580 pi
.recovery_info
.size
= obc
->obs
.oi
.size
;
1581 pi
.recovery_info
.copy_subset
= data_subset
;
1582 pi
.recovery_info
.clone_subset
= clone_subsets
;
1583 pi
.recovery_info
.soid
= soid
;
1584 pi
.recovery_info
.oi
= obc
->obs
.oi
;
1585 pi
.recovery_info
.ss
= pop
->recovery_info
.ss
;
1586 pi
.recovery_info
.version
= version
;
1587 pi
.lock_manager
= std::move(lock_manager
);
1589 ObjectRecoveryProgress new_progress
;
1590 int r
= build_push_op(pi
.recovery_info
,
1591 pi
.recovery_progress
,
1594 &(pi
.stat
), cache_dont_need
);
1596 pi
.recovery_progress
= new_progress
;
1599 void ReplicatedBackend::submit_push_data(
1600 const ObjectRecoveryInfo
&recovery_info
,
1603 bool cache_dont_need
,
1604 const interval_set
<uint64_t> &intervals_included
,
1605 bufferlist data_included
,
1606 bufferlist omap_header
,
1607 const map
<string
, bufferlist
> &attrs
,
1608 const map
<string
, bufferlist
> &omap_entries
,
1609 ObjectStore::Transaction
*t
)
1611 hobject_t target_oid
;
1612 if (first
&& complete
) {
1613 target_oid
= recovery_info
.soid
;
1615 target_oid
= get_parent()->get_temp_recovery_object(recovery_info
.soid
,
1616 recovery_info
.version
);
1618 dout(10) << __func__
<< ": Adding oid "
1619 << target_oid
<< " in the temp collection" << dendl
;
1620 add_temp_obj(target_oid
);
1625 t
->remove(coll
, ghobject_t(target_oid
));
1626 t
->touch(coll
, ghobject_t(target_oid
));
1627 t
->truncate(coll
, ghobject_t(target_oid
), recovery_info
.size
);
1628 if (omap_header
.length())
1629 t
->omap_setheader(coll
, ghobject_t(target_oid
), omap_header
);
1631 bufferlist bv
= attrs
.at(OI_ATTR
);
1632 object_info_t
oi(bv
);
1633 t
->set_alloc_hint(coll
, ghobject_t(target_oid
),
1634 oi
.expected_object_size
,
1635 oi
.expected_write_size
,
1636 oi
.alloc_hint_flags
);
1639 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
;
1640 if (cache_dont_need
)
1641 fadvise_flags
|= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
1642 for (interval_set
<uint64_t>::const_iterator p
= intervals_included
.begin();
1643 p
!= intervals_included
.end();
1646 bit
.substr_of(data_included
, off
, p
.get_len());
1647 t
->write(coll
, ghobject_t(target_oid
),
1648 p
.get_start(), p
.get_len(), bit
, fadvise_flags
);
1652 if (!omap_entries
.empty())
1653 t
->omap_setkeys(coll
, ghobject_t(target_oid
), omap_entries
);
1655 t
->setattrs(coll
, ghobject_t(target_oid
), attrs
);
1659 dout(10) << __func__
<< ": Removing oid "
1660 << target_oid
<< " from the temp collection" << dendl
;
1661 clear_temp_obj(target_oid
);
1662 t
->remove(coll
, ghobject_t(recovery_info
.soid
));
1663 t
->collection_move_rename(coll
, ghobject_t(target_oid
),
1664 coll
, ghobject_t(recovery_info
.soid
));
1667 submit_push_complete(recovery_info
, t
);
1671 void ReplicatedBackend::submit_push_complete(
1672 const ObjectRecoveryInfo
&recovery_info
,
1673 ObjectStore::Transaction
*t
)
1675 for (map
<hobject_t
, interval_set
<uint64_t>>::const_iterator p
=
1676 recovery_info
.clone_subset
.begin();
1677 p
!= recovery_info
.clone_subset
.end();
1679 for (interval_set
<uint64_t>::const_iterator q
= p
->second
.begin();
1680 q
!= p
->second
.end();
1682 dout(15) << " clone_range " << p
->first
<< " "
1683 << q
.get_start() << "~" << q
.get_len() << dendl
;
1684 t
->clone_range(coll
, ghobject_t(p
->first
), ghobject_t(recovery_info
.soid
),
1685 q
.get_start(), q
.get_len(), q
.get_start());
1690 ObjectRecoveryInfo
ReplicatedBackend::recalc_subsets(
1691 const ObjectRecoveryInfo
& recovery_info
,
1692 SnapSetContext
*ssc
,
1693 ObcLockManager
&manager
)
1695 if (!recovery_info
.soid
.snap
|| recovery_info
.soid
.snap
>= CEPH_NOSNAP
)
1696 return recovery_info
;
1697 ObjectRecoveryInfo new_info
= recovery_info
;
1698 new_info
.copy_subset
.clear();
1699 new_info
.clone_subset
.clear();
1701 get_parent()->release_locks(manager
); // might already have locks
1703 ssc
->snapset
, new_info
.soid
, get_parent()->get_local_missing(),
1704 get_info().last_backfill
,
1705 new_info
.copy_subset
, new_info
.clone_subset
,
1710 bool ReplicatedBackend::handle_pull_response(
1711 pg_shard_t from
, const PushOp
&pop
, PullOp
*response
,
1712 list
<pull_complete_info
> *to_continue
,
1713 ObjectStore::Transaction
*t
)
1715 interval_set
<uint64_t> data_included
= pop
.data_included
;
1718 dout(10) << "handle_pull_response "
1719 << pop
.recovery_info
1720 << pop
.after_progress
1721 << " data.size() is " << data
.length()
1722 << " data_included: " << data_included
1724 if (pop
.version
== eversion_t()) {
1725 // replica doesn't have it!
1726 _failed_push(from
, pop
.soid
);
1730 const hobject_t
&hoid
= pop
.soid
;
1731 assert((data_included
.empty() && data
.length() == 0) ||
1732 (!data_included
.empty() && data
.length() > 0));
1734 auto piter
= pulling
.find(hoid
);
1735 if (piter
== pulling
.end()) {
1739 PullInfo
&pi
= piter
->second
;
1740 if (pi
.recovery_info
.size
== (uint64_t(-1))) {
1741 pi
.recovery_info
.size
= pop
.recovery_info
.size
;
1742 pi
.recovery_info
.copy_subset
.intersection_of(
1743 pop
.recovery_info
.copy_subset
);
1746 bool first
= pi
.recovery_progress
.first
;
1748 // attrs only reference the origin bufferlist (decode from
1749 // MOSDPGPush message) whose size is much greater than attrs in
1750 // recovery. If obc cache it (get_obc maybe cache the attr), this
1751 // causes the whole origin bufferlist would not be free until obc
1752 // is evicted from obc cache. So rebuild the bufferlists before
1754 auto attrset
= pop
.attrset
;
1755 for (auto& a
: attrset
) {
1758 pi
.obc
= get_parent()->get_obc(pi
.recovery_info
.soid
, attrset
);
1759 pi
.recovery_info
.oi
= pi
.obc
->obs
.oi
;
1760 pi
.recovery_info
= recalc_subsets(
1767 interval_set
<uint64_t> usable_intervals
;
1768 bufferlist usable_data
;
1769 trim_pushed_data(pi
.recovery_info
.copy_subset
,
1774 data_included
= usable_intervals
;
1775 data
.claim(usable_data
);
1778 pi
.recovery_progress
= pop
.after_progress
;
1780 dout(10) << "new recovery_info " << pi
.recovery_info
1781 << ", new progress " << pi
.recovery_progress
1784 bool complete
= pi
.is_complete();
1786 submit_push_data(pi
.recovery_info
, first
,
1787 complete
, pi
.cache_dont_need
,
1788 data_included
, data
,
1794 pi
.stat
.num_keys_recovered
+= pop
.omap_entries
.size();
1795 pi
.stat
.num_bytes_recovered
+= data
.length();
1798 pi
.stat
.num_objects_recovered
++;
1799 clear_pull_from(piter
);
1800 to_continue
->push_back({hoid
, pi
.stat
});
1801 get_parent()->on_local_recover(
1802 hoid
, pi
.recovery_info
, pi
.obc
, t
);
1805 response
->soid
= pop
.soid
;
1806 response
->recovery_info
= pi
.recovery_info
;
1807 response
->recovery_progress
= pi
.recovery_progress
;
1812 void ReplicatedBackend::handle_push(
1813 pg_shard_t from
, const PushOp
&pop
, PushReplyOp
*response
,
1814 ObjectStore::Transaction
*t
)
1816 dout(10) << "handle_push "
1817 << pop
.recovery_info
1818 << pop
.after_progress
1822 bool first
= pop
.before_progress
.first
;
1823 bool complete
= pop
.after_progress
.data_complete
&&
1824 pop
.after_progress
.omap_complete
;
1826 response
->soid
= pop
.recovery_info
.soid
;
1827 submit_push_data(pop
.recovery_info
,
1830 true, // must be replicate
1839 get_parent()->on_local_recover(
1840 pop
.recovery_info
.soid
,
1842 ObjectContextRef(), // ok, is replica
1846 void ReplicatedBackend::send_pushes(int prio
, map
<pg_shard_t
, vector
<PushOp
> > &pushes
)
1848 for (map
<pg_shard_t
, vector
<PushOp
> >::iterator i
= pushes
.begin();
1851 ConnectionRef con
= get_parent()->get_con_osd_cluster(
1853 get_osdmap()->get_epoch());
1856 vector
<PushOp
>::iterator j
= i
->second
.begin();
1857 while (j
!= i
->second
.end()) {
1859 uint64_t pushes
= 0;
1860 MOSDPGPush
*msg
= new MOSDPGPush();
1861 msg
->from
= get_parent()->whoami_shard();
1862 msg
->pgid
= get_parent()->primary_spg_t();
1863 msg
->map_epoch
= get_osdmap()->get_epoch();
1864 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
1865 msg
->set_priority(prio
);
1867 (j
!= i
->second
.end() &&
1868 cost
< cct
->_conf
->osd_max_push_cost
&&
1869 pushes
< cct
->_conf
->osd_max_push_objects
) ;
1871 dout(20) << __func__
<< ": sending push " << *j
1872 << " to osd." << i
->first
<< dendl
;
1873 cost
+= j
->cost(cct
);
1875 msg
->pushes
.push_back(*j
);
1877 msg
->set_cost(cost
);
1878 get_parent()->send_message_osd_cluster(msg
, con
);
1883 void ReplicatedBackend::send_pulls(int prio
, map
<pg_shard_t
, vector
<PullOp
> > &pulls
)
1885 for (map
<pg_shard_t
, vector
<PullOp
> >::iterator i
= pulls
.begin();
1888 ConnectionRef con
= get_parent()->get_con_osd_cluster(
1890 get_osdmap()->get_epoch());
1893 dout(20) << __func__
<< ": sending pulls " << i
->second
1894 << " to osd." << i
->first
<< dendl
;
1895 MOSDPGPull
*msg
= new MOSDPGPull();
1896 msg
->from
= parent
->whoami_shard();
1897 msg
->set_priority(prio
);
1898 msg
->pgid
= get_parent()->primary_spg_t();
1899 msg
->map_epoch
= get_osdmap()->get_epoch();
1900 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
1901 msg
->set_pulls(&i
->second
);
1902 msg
->compute_cost(cct
);
1903 get_parent()->send_message_osd_cluster(msg
, con
);
1907 int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo
&recovery_info
,
1908 const ObjectRecoveryProgress
&progress
,
1909 ObjectRecoveryProgress
*out_progress
,
1911 object_stat_sum_t
*stat
,
1912 bool cache_dont_need
)
1914 ObjectRecoveryProgress _new_progress
;
1916 out_progress
= &_new_progress
;
1917 ObjectRecoveryProgress
&new_progress
= *out_progress
;
1918 new_progress
= progress
;
1920 dout(7) << "send_push_op " << recovery_info
.soid
1921 << " v " << recovery_info
.version
1922 << " size " << recovery_info
.size
1923 << " recovery_info: " << recovery_info
1926 if (progress
.first
) {
1927 int r
= store
->omap_get_header(coll
, ghobject_t(recovery_info
.soid
), &out_op
->omap_header
);
1929 dout(1) << __func__
<< " get omap header failed: " << cpp_strerror(-r
) << dendl
;
1932 r
= store
->getattrs(ch
, ghobject_t(recovery_info
.soid
), out_op
->attrset
);
1934 dout(1) << __func__
<< " getattrs failed: " << cpp_strerror(-r
) << dendl
;
1939 bufferlist bv
= out_op
->attrset
[OI_ATTR
];
1940 object_info_t
oi(bv
);
1942 if (oi
.version
!= recovery_info
.version
) {
1943 get_parent()->clog_error() << get_info().pgid
<< " push "
1944 << recovery_info
.soid
<< " v "
1945 << recovery_info
.version
1946 << " failed because local copy is "
1951 new_progress
.first
= false;
1954 uint64_t available
= cct
->_conf
->osd_recovery_max_chunk
;
1955 if (!progress
.omap_complete
) {
1956 ObjectMap::ObjectMapIterator iter
=
1957 store
->get_omap_iterator(coll
,
1958 ghobject_t(recovery_info
.soid
));
1960 for (iter
->lower_bound(progress
.omap_recovered_to
);
1962 iter
->next(false)) {
1963 if (!out_op
->omap_entries
.empty() &&
1964 ((cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
> 0 &&
1965 out_op
->omap_entries
.size() >= cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
) ||
1966 available
<= iter
->key().size() + iter
->value().length()))
1968 out_op
->omap_entries
.insert(make_pair(iter
->key(), iter
->value()));
1970 if ((iter
->key().size() + iter
->value().length()) <= available
)
1971 available
-= (iter
->key().size() + iter
->value().length());
1976 new_progress
.omap_complete
= true;
1978 new_progress
.omap_recovered_to
= iter
->key();
1981 if (available
> 0) {
1982 if (!recovery_info
.copy_subset
.empty()) {
1983 interval_set
<uint64_t> copy_subset
= recovery_info
.copy_subset
;
1984 map
<uint64_t, uint64_t> m
;
1985 int r
= store
->fiemap(ch
, ghobject_t(recovery_info
.soid
), 0,
1986 copy_subset
.range_end(), m
);
1988 interval_set
<uint64_t> fiemap_included(m
);
1989 copy_subset
.intersection_of(fiemap_included
);
1991 // intersection of copy_subset and empty interval_set would be empty anyway
1992 copy_subset
.clear();
1995 out_op
->data_included
.span_of(copy_subset
, progress
.data_recovered_to
,
1997 if (out_op
->data_included
.empty()) // zero filled section, skip to end!
1998 new_progress
.data_recovered_to
= recovery_info
.copy_subset
.range_end();
2000 new_progress
.data_recovered_to
= out_op
->data_included
.range_end();
2003 out_op
->data_included
.clear();
2006 for (interval_set
<uint64_t>::iterator p
= out_op
->data_included
.begin();
2007 p
!= out_op
->data_included
.end();
2010 store
->read(ch
, ghobject_t(recovery_info
.soid
),
2011 p
.get_start(), p
.get_len(), bit
,
2012 cache_dont_need
? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
: 0);
2013 if (p
.get_len() != bit
.length()) {
2014 dout(10) << " extent " << p
.get_start() << "~" << p
.get_len()
2015 << " is actually " << p
.get_start() << "~" << bit
.length()
2017 interval_set
<uint64_t>::iterator save
= p
++;
2018 if (bit
.length() == 0)
2019 out_op
->data_included
.erase(save
); //Remove this empty interval
2021 save
.set_len(bit
.length());
2022 // Remove any other intervals present
2023 while (p
!= out_op
->data_included
.end()) {
2024 interval_set
<uint64_t>::iterator save
= p
++;
2025 out_op
->data_included
.erase(save
);
2027 new_progress
.data_complete
= true;
2028 out_op
->data
.claim_append(bit
);
2031 out_op
->data
.claim_append(bit
);
2034 if (new_progress
.is_complete(recovery_info
)) {
2035 new_progress
.data_complete
= true;
2037 stat
->num_objects_recovered
++;
2041 stat
->num_keys_recovered
+= out_op
->omap_entries
.size();
2042 stat
->num_bytes_recovered
+= out_op
->data
.length();
2045 get_parent()->get_logger()->inc(l_osd_push
);
2046 get_parent()->get_logger()->inc(l_osd_push_outb
, out_op
->data
.length());
2049 out_op
->version
= recovery_info
.version
;
2050 out_op
->soid
= recovery_info
.soid
;
2051 out_op
->recovery_info
= recovery_info
;
2052 out_op
->after_progress
= new_progress
;
2053 out_op
->before_progress
= progress
;
2057 void ReplicatedBackend::prep_push_op_blank(const hobject_t
& soid
, PushOp
*op
)
2059 op
->recovery_info
.version
= eversion_t();
2060 op
->version
= eversion_t();
2064 bool ReplicatedBackend::handle_push_reply(
2065 pg_shard_t peer
, const PushReplyOp
&op
, PushOp
*reply
)
2067 const hobject_t
&soid
= op
.soid
;
2068 if (pushing
.count(soid
) == 0) {
2069 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2070 << ", or anybody else"
2073 } else if (pushing
[soid
].count(peer
) == 0) {
2074 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2078 PushInfo
*pi
= &pushing
[soid
][peer
];
2080 if (!pi
->recovery_progress
.data_complete
) {
2081 dout(10) << " pushing more from, "
2082 << pi
->recovery_progress
.data_recovered_to
2083 << " of " << pi
->recovery_info
.copy_subset
<< dendl
;
2084 ObjectRecoveryProgress new_progress
;
2085 int r
= build_push_op(
2087 pi
->recovery_progress
, &new_progress
, reply
,
2090 pi
->recovery_progress
= new_progress
;
2094 get_parent()->on_peer_recover(
2095 peer
, soid
, pi
->recovery_info
);
2097 get_parent()->release_locks(pi
->lock_manager
);
2098 object_stat_sum_t stat
= pi
->stat
;
2099 pushing
[soid
].erase(peer
);
2102 if (pushing
[soid
].empty()) {
2103 get_parent()->on_global_recover(soid
, stat
);
2104 pushing
.erase(soid
);
2106 dout(10) << "pushed " << soid
<< ", still waiting for push ack from "
2107 << pushing
[soid
].size() << " others" << dendl
;
2114 void ReplicatedBackend::handle_pull(pg_shard_t peer
, PullOp
&op
, PushOp
*reply
)
2116 const hobject_t
&soid
= op
.soid
;
2118 int r
= store
->stat(ch
, ghobject_t(soid
), &st
);
2120 get_parent()->clog_error() << get_info().pgid
<< " "
2121 << peer
<< " tried to pull " << soid
2122 << " but got " << cpp_strerror(-r
);
2123 prep_push_op_blank(soid
, reply
);
2125 ObjectRecoveryInfo
&recovery_info
= op
.recovery_info
;
2126 ObjectRecoveryProgress
&progress
= op
.recovery_progress
;
2127 if (progress
.first
&& recovery_info
.size
== ((uint64_t)-1)) {
2128 // Adjust size and copy_subset
2129 recovery_info
.size
= st
.st_size
;
2130 recovery_info
.copy_subset
.clear();
2132 recovery_info
.copy_subset
.insert(0, st
.st_size
);
2133 assert(recovery_info
.clone_subset
.empty());
2136 r
= build_push_op(recovery_info
, progress
, 0, reply
);
2138 prep_push_op_blank(soid
, reply
);
2143 * trim received data to remove what we don't want
2145 * @param copy_subset intervals we want
2146 * @param data_included intervals we got
2147 * @param data_recieved data we got
2148 * @param intervals_usable intervals we want to keep
2149 * @param data_usable matching data we want to keep
2151 void ReplicatedBackend::trim_pushed_data(
2152 const interval_set
<uint64_t> ©_subset
,
2153 const interval_set
<uint64_t> &intervals_received
,
2154 bufferlist data_received
,
2155 interval_set
<uint64_t> *intervals_usable
,
2156 bufferlist
*data_usable
)
2158 if (intervals_received
.subset_of(copy_subset
)) {
2159 *intervals_usable
= intervals_received
;
2160 *data_usable
= data_received
;
2164 intervals_usable
->intersection_of(copy_subset
,
2165 intervals_received
);
2168 for (interval_set
<uint64_t>::const_iterator p
= intervals_received
.begin();
2169 p
!= intervals_received
.end();
2171 interval_set
<uint64_t> x
;
2172 x
.insert(p
.get_start(), p
.get_len());
2173 x
.intersection_of(copy_subset
);
2174 for (interval_set
<uint64_t>::const_iterator q
= x
.begin();
2178 uint64_t data_off
= off
+ (q
.get_start() - p
.get_start());
2179 sub
.substr_of(data_received
, data_off
, q
.get_len());
2180 data_usable
->claim_append(sub
);
2186 void ReplicatedBackend::_failed_push(pg_shard_t from
, const hobject_t
&soid
)
2188 list
<pg_shard_t
> fl
= { from
};
2189 get_parent()->failed_push(fl
, soid
);
2191 clear_pull(pulling
.find(soid
));
2194 void ReplicatedBackend::clear_pull_from(
2195 map
<hobject_t
, PullInfo
>::iterator piter
)
2197 auto from
= piter
->second
.from
;
2198 pull_from_peer
[from
].erase(piter
->second
.soid
);
2199 if (pull_from_peer
[from
].empty())
2200 pull_from_peer
.erase(from
);
2203 void ReplicatedBackend::clear_pull(
2204 map
<hobject_t
, PullInfo
>::iterator piter
,
2205 bool clear_pull_from_peer
)
2207 if (clear_pull_from_peer
) {
2208 clear_pull_from(piter
);
2210 get_parent()->release_locks(piter
->second
.lock_manager
);
2211 pulling
.erase(piter
);
2214 int ReplicatedBackend::start_pushes(
2215 const hobject_t
&soid
,
2216 ObjectContextRef obc
,
2221 assert(get_parent()->get_actingbackfill_shards().size() > 0);
2222 for (set
<pg_shard_t
>::iterator i
=
2223 get_parent()->get_actingbackfill_shards().begin();
2224 i
!= get_parent()->get_actingbackfill_shards().end();
2226 if (*i
== get_parent()->whoami_shard()) continue;
2227 pg_shard_t peer
= *i
;
2228 map
<pg_shard_t
, pg_missing_t
>::const_iterator j
=
2229 get_parent()->get_shard_missing().find(peer
);
2230 assert(j
!= get_parent()->get_shard_missing().end());
2231 if (j
->second
.is_missing(soid
)) {
2233 h
->pushes
[peer
].push_back(PushOp());
2234 prep_push_to_replica(obc
, soid
, peer
,
2235 &(h
->pushes
[peer
].back()), h
->cache_dont_need
);