1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "common/errno.h"
15 #include "ReplicatedBackend.h"
16 #include "messages/MOSDOp.h"
17 #include "messages/MOSDSubOp.h"
18 #include "messages/MOSDRepOp.h"
19 #include "messages/MOSDSubOpReply.h"
20 #include "messages/MOSDRepOpReply.h"
21 #include "messages/MOSDPGPush.h"
22 #include "messages/MOSDPGPull.h"
23 #include "messages/MOSDPGPushReply.h"
24 #include "common/EventTrace.h"
26 #define dout_context cct
27 #define dout_subsys ceph_subsys_osd
28 #define DOUT_PREFIX_ARGS this
30 #define dout_prefix _prefix(_dout, this)
31 static ostream
& _prefix(std::ostream
*_dout
, ReplicatedBackend
*pgb
) {
32 return *_dout
<< pgb
->get_parent()->gen_dbg_prefix();
36 class PG_SendMessageOnConn
: public Context
{
37 PGBackend::Listener
*pg
;
42 PGBackend::Listener
*pg
,
44 ConnectionRef conn
) : pg(pg
), reply(reply
), conn(conn
) {}
45 void finish(int) override
{
46 pg
->send_message_osd_cluster(reply
, conn
.get());
50 class PG_RecoveryQueueAsync
: public Context
{
51 PGBackend::Listener
*pg
;
52 unique_ptr
<GenContext
<ThreadPool::TPHandle
&>> c
;
54 PG_RecoveryQueueAsync(
55 PGBackend::Listener
*pg
,
56 GenContext
<ThreadPool::TPHandle
&> *c
) : pg(pg
), c(c
) {}
57 void finish(int) override
{
58 pg
->schedule_recovery_work(c
.release());
63 struct ReplicatedBackend::C_OSD_RepModifyApply
: public Context
{
64 ReplicatedBackend
*pg
;
66 C_OSD_RepModifyApply(ReplicatedBackend
*pg
, RepModifyRef r
)
68 void finish(int r
) override
{
69 pg
->repop_applied(rm
);
73 struct ReplicatedBackend::C_OSD_RepModifyCommit
: public Context
{
74 ReplicatedBackend
*pg
;
76 C_OSD_RepModifyCommit(ReplicatedBackend
*pg
, RepModifyRef r
)
78 void finish(int r
) override
{
83 static void log_subop_stats(
85 OpRequestRef op
, int subop
)
87 utime_t now
= ceph_clock_now();
88 utime_t latency
= now
;
89 latency
-= op
->get_req()->get_recv_stamp();
92 logger
->inc(l_osd_sop
);
93 logger
->tinc(l_osd_sop_lat
, latency
);
96 if (subop
!= l_osd_sop_pull
) {
97 uint64_t inb
= op
->get_req()->get_data().length();
98 logger
->inc(l_osd_sop_inb
, inb
);
99 if (subop
== l_osd_sop_w
) {
100 logger
->inc(l_osd_sop_w_inb
, inb
);
101 logger
->tinc(l_osd_sop_w_lat
, latency
);
102 } else if (subop
== l_osd_sop_push
) {
103 logger
->inc(l_osd_sop_push_inb
, inb
);
104 logger
->tinc(l_osd_sop_push_lat
, latency
);
106 assert("no support subop" == 0);
108 logger
->tinc(l_osd_sop_pull_lat
, latency
);
112 ReplicatedBackend::ReplicatedBackend(
113 PGBackend::Listener
*pg
,
115 ObjectStore::CollectionHandle
&c
,
118 PGBackend(cct
, pg
, store
, coll
, c
) {}
120 void ReplicatedBackend::run_recovery_op(
121 PGBackend::RecoveryHandle
*_h
,
124 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
125 send_pushes(priority
, h
->pushes
);
126 send_pulls(priority
, h
->pulls
);
130 int ReplicatedBackend::recover_object(
131 const hobject_t
&hoid
,
133 ObjectContextRef head
,
134 ObjectContextRef obc
,
138 dout(10) << __func__
<< ": " << hoid
<< dendl
;
139 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
140 if (get_parent()->get_local_missing().is_missing(hoid
)) {
150 int started
= start_pushes(
155 pushing
[hoid
].clear();
162 void ReplicatedBackend::check_recovery_sources(const OSDMapRef
& osdmap
)
164 for(map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= pull_from_peer
.begin();
165 i
!= pull_from_peer
.end();
167 if (osdmap
->is_down(i
->first
.osd
)) {
168 dout(10) << "check_recovery_sources resetting pulls from osd." << i
->first
169 << ", osdmap has it marked down" << dendl
;
170 for (set
<hobject_t
>::iterator j
= i
->second
.begin();
171 j
!= i
->second
.end();
173 get_parent()->cancel_pull(*j
);
174 clear_pull(pulling
.find(*j
), false);
176 pull_from_peer
.erase(i
++);
183 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op
)
185 dout(10) << __func__
<< ": " << op
<< dendl
;
186 switch (op
->get_req()->get_type()) {
187 case MSG_OSD_PG_PULL
:
194 bool ReplicatedBackend::handle_message(
198 dout(10) << __func__
<< ": " << op
<< dendl
;
199 switch (op
->get_req()->get_type()) {
200 case MSG_OSD_PG_PUSH
:
204 case MSG_OSD_PG_PULL
:
208 case MSG_OSD_PG_PUSH_REPLY
:
212 case MSG_OSD_SUBOP
: {
213 const MOSDSubOp
*m
= static_cast<const MOSDSubOp
*>(op
->get_req());
214 if (m
->ops
.size() == 0) {
220 case MSG_OSD_REPOP
: {
225 case MSG_OSD_REPOPREPLY
: {
236 void ReplicatedBackend::clear_recovery_state()
238 // clear pushing/pulling maps
239 for (auto &&i
: pushing
) {
240 for (auto &&j
: i
.second
) {
241 get_parent()->release_locks(j
.second
.lock_manager
);
246 for (auto &&i
: pulling
) {
247 get_parent()->release_locks(i
.second
.lock_manager
);
250 pull_from_peer
.clear();
253 void ReplicatedBackend::on_change()
255 dout(10) << __func__
<< dendl
;
256 for (map
<ceph_tid_t
, InProgressOp
>::iterator i
= in_progress_ops
.begin();
257 i
!= in_progress_ops
.end();
258 in_progress_ops
.erase(i
++)) {
259 if (i
->second
.on_commit
)
260 delete i
->second
.on_commit
;
261 if (i
->second
.on_applied
)
262 delete i
->second
.on_applied
;
264 clear_recovery_state();
267 void ReplicatedBackend::on_flushed()
271 int ReplicatedBackend::objects_read_sync(
272 const hobject_t
&hoid
,
278 return store
->read(ch
, ghobject_t(hoid
), off
, len
, *bl
, op_flags
);
281 struct AsyncReadCallback
: public GenContext
<ThreadPool::TPHandle
&> {
284 AsyncReadCallback(int r
, Context
*c
) : r(r
), c(c
) {}
285 void finish(ThreadPool::TPHandle
&) override
{
289 ~AsyncReadCallback() override
{
293 void ReplicatedBackend::objects_read_async(
294 const hobject_t
&hoid
,
295 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
296 pair
<bufferlist
*, Context
*> > > &to_read
,
297 Context
*on_complete
,
300 // There is no fast read implementation for replication backend yet
304 for (list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
305 pair
<bufferlist
*, Context
*> > >::const_iterator i
=
307 i
!= to_read
.end() && r
>= 0;
309 int _r
= store
->read(ch
, ghobject_t(hoid
), i
->first
.get
<0>(),
310 i
->first
.get
<1>(), *(i
->second
.first
),
312 if (i
->second
.second
) {
313 get_parent()->schedule_recovery_work(
314 get_parent()->bless_gencontext(
315 new AsyncReadCallback(_r
, i
->second
.second
)));
320 get_parent()->schedule_recovery_work(
321 get_parent()->bless_gencontext(
322 new AsyncReadCallback(r
, on_complete
)));
325 class C_OSD_OnOpCommit
: public Context
{
326 ReplicatedBackend
*pg
;
327 ReplicatedBackend::InProgressOp
*op
;
329 C_OSD_OnOpCommit(ReplicatedBackend
*pg
, ReplicatedBackend::InProgressOp
*op
)
331 void finish(int) override
{
336 class C_OSD_OnOpApplied
: public Context
{
337 ReplicatedBackend
*pg
;
338 ReplicatedBackend::InProgressOp
*op
;
340 C_OSD_OnOpApplied(ReplicatedBackend
*pg
, ReplicatedBackend::InProgressOp
*op
)
342 void finish(int) override
{
347 void generate_transaction(
348 PGTransactionUPtr
&pgt
,
350 bool legacy_log_entries
,
351 vector
<pg_log_entry_t
> &log_entries
,
352 ObjectStore::Transaction
*t
,
353 set
<hobject_t
> *added
,
354 set
<hobject_t
> *removed
)
360 for (auto &&le
: log_entries
) {
361 le
.mark_unrollbackable();
362 auto oiter
= pgt
->op_map
.find(le
.soid
);
363 if (oiter
!= pgt
->op_map
.end() && oiter
->second
.updated_snaps
) {
364 bufferlist
bl(oiter
->second
.updated_snaps
->second
.size() * 8 + 8);
365 ::encode(oiter
->second
.updated_snaps
->second
, bl
);
367 le
.snaps
.reassign_to_mempool(mempool::mempool_osd_pglog
);
371 pgt
->safe_create_traverse(
372 [&](pair
<const hobject_t
, PGTransaction::ObjectOperation
> &obj_op
) {
373 const hobject_t
&oid
= obj_op
.first
;
374 const ghobject_t goid
=
375 ghobject_t(oid
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
);
376 const PGTransaction::ObjectOperation
&op
= obj_op
.second
;
379 if (op
.is_fresh_object()) {
381 } else if (op
.is_delete()) {
382 removed
->insert(oid
);
386 if (op
.delete_first
) {
387 t
->remove(coll
, goid
);
392 [&](const PGTransaction::ObjectOperation::Init::None
&) {
394 [&](const PGTransaction::ObjectOperation::Init::Create
&op
) {
395 t
->touch(coll
, goid
);
397 [&](const PGTransaction::ObjectOperation::Init::Clone
&op
) {
401 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
404 [&](const PGTransaction::ObjectOperation::Init::Rename
&op
) {
405 assert(op
.source
.is_temp());
406 t
->collection_move_rename(
409 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
415 t
->truncate(coll
, goid
, op
.truncate
->first
);
416 if (op
.truncate
->first
!= op
.truncate
->second
)
417 t
->truncate(coll
, goid
, op
.truncate
->second
);
420 if (!op
.attr_updates
.empty()) {
421 map
<string
, bufferlist
> attrs
;
422 for (auto &&p
: op
.attr_updates
) {
424 attrs
[p
.first
] = *(p
.second
);
426 t
->rmattr(coll
, goid
, p
.first
);
428 t
->setattrs(coll
, goid
, attrs
);
432 t
->omap_clear(coll
, goid
);
434 t
->omap_setheader(coll
, goid
, *(op
.omap_header
));
436 for (auto &&up
: op
.omap_updates
) {
437 using UpdateType
= PGTransaction::ObjectOperation::OmapUpdateType
;
439 case UpdateType::Remove
:
440 t
->omap_rmkeys(coll
, goid
, up
.second
);
442 case UpdateType::Insert
:
443 t
->omap_setkeys(coll
, goid
, up
.second
);
448 // updated_snaps doesn't matter since we marked unrollbackable
451 auto &hint
= *(op
.alloc_hint
);
455 hint
.expected_object_size
,
456 hint
.expected_write_size
,
460 for (auto &&extent
: op
.buffer_updates
) {
461 using BufferUpdate
= PGTransaction::ObjectOperation::BufferUpdate
;
464 [&](const BufferUpdate::Write
&op
) {
472 [&](const BufferUpdate::Zero
&op
) {
479 [&](const BufferUpdate::CloneRange
&op
) {
480 assert(op
.len
== extent
.get_len());
483 ghobject_t(op
.from
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
493 void ReplicatedBackend::submit_transaction(
494 const hobject_t
&soid
,
495 const object_stat_sum_t
&delta_stats
,
496 const eversion_t
&at_version
,
497 PGTransactionUPtr
&&_t
,
498 const eversion_t
&trim_to
,
499 const eversion_t
&roll_forward_to
,
500 const vector
<pg_log_entry_t
> &_log_entries
,
501 boost::optional
<pg_hit_set_history_t
> &hset_history
,
502 Context
*on_local_applied_sync
,
503 Context
*on_all_acked
,
504 Context
*on_all_commit
,
507 OpRequestRef orig_op
)
513 vector
<pg_log_entry_t
> log_entries(_log_entries
);
514 ObjectStore::Transaction op_t
;
515 PGTransactionUPtr
t(std::move(_t
));
516 set
<hobject_t
> added
, removed
;
517 generate_transaction(
520 (get_osdmap()->require_osd_release
< CEPH_RELEASE_KRAKEN
),
525 assert(added
.size() <= 1);
526 assert(removed
.size() <= 1);
528 assert(!in_progress_ops
.count(tid
));
529 InProgressOp
&op
= in_progress_ops
.insert(
533 tid
, on_all_commit
, on_all_acked
,
538 op
.waiting_for_applied
.insert(
539 parent
->get_actingbackfill_shards().begin(),
540 parent
->get_actingbackfill_shards().end());
541 op
.waiting_for_commit
.insert(
542 parent
->get_actingbackfill_shards().begin(),
543 parent
->get_actingbackfill_shards().end());
552 added
.size() ? *(added
.begin()) : hobject_t(),
553 removed
.size() ? *(removed
.begin()) : hobject_t(),
559 add_temp_objs(added
);
560 clear_temp_objs(removed
);
562 parent
->log_operation(
570 op_t
.register_on_applied_sync(on_local_applied_sync
);
571 op_t
.register_on_applied(
572 parent
->bless_context(
573 new C_OSD_OnOpApplied(this, &op
)));
574 op_t
.register_on_commit(
575 parent
->bless_context(
576 new C_OSD_OnOpCommit(this, &op
)));
578 vector
<ObjectStore::Transaction
> tls
;
579 tls
.push_back(std::move(op_t
));
581 parent
->queue_transactions(tls
, op
.op
);
584 void ReplicatedBackend::op_applied(
588 OID_EVENT_TRACE_WITH_MSG((op
&& op
->op
) ? op
->op
->get_req() : NULL
, "OP_APPLIED_BEGIN", true);
589 dout(10) << __func__
<< ": " << op
->tid
<< dendl
;
591 op
->op
->mark_event("op_applied");
592 op
->op
->pg_trace
.event("op applied");
595 op
->waiting_for_applied
.erase(get_parent()->whoami_shard());
596 parent
->op_applied(op
->v
);
598 if (op
->waiting_for_applied
.empty()) {
599 op
->on_applied
->complete(0);
603 assert(!op
->on_commit
&& !op
->on_applied
);
604 in_progress_ops
.erase(op
->tid
);
608 void ReplicatedBackend::op_commit(
612 OID_EVENT_TRACE_WITH_MSG((op
&& op
->op
) ? op
->op
->get_req() : NULL
, "OP_COMMIT_BEGIN", true);
613 dout(10) << __func__
<< ": " << op
->tid
<< dendl
;
615 op
->op
->mark_event("op_commit");
616 op
->op
->pg_trace
.event("op commit");
619 op
->waiting_for_commit
.erase(get_parent()->whoami_shard());
621 if (op
->waiting_for_commit
.empty()) {
622 op
->on_commit
->complete(0);
626 assert(!op
->on_commit
&& !op
->on_applied
);
627 in_progress_ops
.erase(op
->tid
);
631 void ReplicatedBackend::do_repop_reply(OpRequestRef op
)
633 static_cast<MOSDRepOpReply
*>(op
->get_nonconst_req())->finish_decode();
634 const MOSDRepOpReply
*r
= static_cast<const MOSDRepOpReply
*>(op
->get_req());
635 assert(r
->get_header().type
== MSG_OSD_REPOPREPLY
);
639 // must be replication.
640 ceph_tid_t rep_tid
= r
->get_tid();
641 pg_shard_t from
= r
->from
;
643 if (in_progress_ops
.count(rep_tid
)) {
644 map
<ceph_tid_t
, InProgressOp
>::iterator iter
=
645 in_progress_ops
.find(rep_tid
);
646 InProgressOp
&ip_op
= iter
->second
;
647 const MOSDOp
*m
= NULL
;
649 m
= static_cast<const MOSDOp
*>(ip_op
.op
->get_req());
652 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " op " //<< *m
653 << " ack_type " << (int)r
->ack_type
657 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " (no op) "
658 << " ack_type " << (int)r
->ack_type
664 if (r
->ack_type
& CEPH_OSD_FLAG_ONDISK
) {
665 assert(ip_op
.waiting_for_commit
.count(from
));
666 ip_op
.waiting_for_commit
.erase(from
);
669 ss
<< "sub_op_commit_rec from " << from
;
670 ip_op
.op
->mark_event_string(ss
.str());
671 ip_op
.op
->pg_trace
.event("sub_op_commit_rec");
674 assert(ip_op
.waiting_for_applied
.count(from
));
677 ss
<< "sub_op_applied_rec from " << from
;
678 ip_op
.op
->mark_event_string(ss
.str());
679 ip_op
.op
->pg_trace
.event("sub_op_applied_rec");
682 ip_op
.waiting_for_applied
.erase(from
);
684 parent
->update_peer_last_complete_ondisk(
686 r
->get_last_complete_ondisk());
688 if (ip_op
.waiting_for_applied
.empty() &&
690 ip_op
.on_applied
->complete(0);
691 ip_op
.on_applied
= 0;
693 if (ip_op
.waiting_for_commit
.empty() &&
695 ip_op
.on_commit
->complete(0);
699 assert(!ip_op
.on_commit
&& !ip_op
.on_applied
);
700 in_progress_ops
.erase(iter
);
705 void ReplicatedBackend::be_deep_scrub(
706 const hobject_t
&poid
,
709 ThreadPool::TPHandle
&handle
)
711 dout(10) << __func__
<< " " << poid
<< " seed "
712 << std::hex
<< seed
<< std::dec
<< dendl
;
713 bufferhash
h(seed
), oh(seed
);
714 bufferlist bl
, hdrbl
;
718 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
| CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
721 handle
.reset_tp_timeout();
725 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
727 cct
->_conf
->osd_deep_scrub_stride
, bl
,
737 dout(25) << __func__
<< " " << poid
<< " got "
738 << r
<< " on read, read_error" << dendl
;
742 o
.digest
= h
.digest();
743 o
.digest_present
= true;
746 r
= store
->omap_get_header(
749 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
751 // NOTE: bobtail to giant, we would crc the head as (len, head).
752 // that changes at the same time we start using a non-zero seed.
753 if (r
== 0 && hdrbl
.length()) {
754 dout(25) << "CRC header " << string(hdrbl
.c_str(), hdrbl
.length())
764 } else if (r
== -EIO
) {
765 dout(25) << __func__
<< " " << poid
<< " got "
766 << r
<< " on omap header read, read_error" << dendl
;
771 ObjectMap::ObjectMapIterator iter
= store
->get_omap_iterator(
774 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
776 for (iter
->seek_to_first(); iter
->status() == 0 && iter
->valid();
778 handle
.reset_tp_timeout();
780 dout(25) << "CRC key " << iter
->key() << " value:\n";
781 iter
->value().hexdump(*_dout
);
784 ::encode(iter
->key(), bl
);
785 ::encode(iter
->value(), bl
);
790 if (iter
->status() < 0) {
791 dout(25) << __func__
<< " " << poid
792 << " on omap scan, db status error" << dendl
;
797 //Store final calculated CRC32 of omap header & key/values
798 o
.omap_digest
= oh
.digest();
799 o
.omap_digest_present
= true;
800 dout(20) << __func__
<< " " << poid
<< " omap_digest "
801 << std::hex
<< o
.omap_digest
<< std::dec
<< dendl
;
804 void ReplicatedBackend::_do_push(OpRequestRef op
)
806 const MOSDPGPush
*m
= static_cast<const MOSDPGPush
*>(op
->get_req());
807 assert(m
->get_type() == MSG_OSD_PG_PUSH
);
808 pg_shard_t from
= m
->from
;
812 vector
<PushReplyOp
> replies
;
813 ObjectStore::Transaction t
;
815 if (get_parent()->check_failsafe_full(ss
)) {
816 dout(10) << __func__
<< " Out of space (failsafe) processing push request: " << ss
.str() << dendl
;
819 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
820 i
!= m
->pushes
.end();
822 replies
.push_back(PushReplyOp());
823 handle_push(from
, *i
, &(replies
.back()), &t
);
826 MOSDPGPushReply
*reply
= new MOSDPGPushReply
;
827 reply
->from
= get_parent()->whoami_shard();
828 reply
->set_priority(m
->get_priority());
829 reply
->pgid
= get_info().pgid
;
830 reply
->map_epoch
= m
->map_epoch
;
831 reply
->min_epoch
= m
->min_epoch
;
832 reply
->replies
.swap(replies
);
833 reply
->compute_cost(cct
);
835 t
.register_on_complete(
836 new PG_SendMessageOnConn(
837 get_parent(), reply
, m
->get_connection()));
839 get_parent()->queue_transaction(std::move(t
));
842 struct C_ReplicatedBackend_OnPullComplete
: GenContext
<ThreadPool::TPHandle
&> {
843 ReplicatedBackend
*bc
;
844 list
<ReplicatedBackend::pull_complete_info
> to_continue
;
846 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend
*bc
, int priority
)
847 : bc(bc
), priority(priority
) {}
849 void finish(ThreadPool::TPHandle
&handle
) override
{
850 ReplicatedBackend::RPGHandle
*h
= bc
->_open_recovery_op();
851 for (auto &&i
: to_continue
) {
852 auto j
= bc
->pulling
.find(i
.hoid
);
853 assert(j
!= bc
->pulling
.end());
854 ObjectContextRef obc
= j
->second
.obc
;
855 bc
->clear_pull(j
, false /* already did it */);
856 int started
= bc
->start_pushes(i
.hoid
, obc
, h
);
858 bc
->pushing
[i
.hoid
].clear();
859 bc
->get_parent()->primary_failed(i
.hoid
);
860 bc
->get_parent()->primary_error(i
.hoid
, obc
->obs
.oi
.version
);
861 } else if (!started
) {
862 bc
->get_parent()->on_global_recover(
865 handle
.reset_tp_timeout();
867 bc
->run_recovery_op(h
, priority
);
871 void ReplicatedBackend::_do_pull_response(OpRequestRef op
)
873 const MOSDPGPush
*m
= static_cast<const MOSDPGPush
*>(op
->get_req());
874 assert(m
->get_type() == MSG_OSD_PG_PUSH
);
875 pg_shard_t from
= m
->from
;
879 vector
<PullOp
> replies(1);
882 if (get_parent()->check_failsafe_full(ss
)) {
883 dout(10) << __func__
<< " Out of space (failsafe) processing pull response (push): " << ss
.str() << dendl
;
887 ObjectStore::Transaction t
;
888 list
<pull_complete_info
> to_continue
;
889 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
890 i
!= m
->pushes
.end();
892 bool more
= handle_pull_response(from
, *i
, &(replies
.back()), &to_continue
, &t
);
894 replies
.push_back(PullOp());
896 if (!to_continue
.empty()) {
897 C_ReplicatedBackend_OnPullComplete
*c
=
898 new C_ReplicatedBackend_OnPullComplete(
901 c
->to_continue
.swap(to_continue
);
902 t
.register_on_complete(
903 new PG_RecoveryQueueAsync(
905 get_parent()->bless_gencontext(c
)));
907 replies
.erase(replies
.end() - 1);
909 if (replies
.size()) {
910 MOSDPGPull
*reply
= new MOSDPGPull
;
911 reply
->from
= parent
->whoami_shard();
912 reply
->set_priority(m
->get_priority());
913 reply
->pgid
= get_info().pgid
;
914 reply
->map_epoch
= m
->map_epoch
;
915 reply
->min_epoch
= m
->min_epoch
;
916 reply
->set_pulls(&replies
);
917 reply
->compute_cost(cct
);
919 t
.register_on_complete(
920 new PG_SendMessageOnConn(
921 get_parent(), reply
, m
->get_connection()));
924 get_parent()->queue_transaction(std::move(t
));
927 void ReplicatedBackend::do_pull(OpRequestRef op
)
929 MOSDPGPull
*m
= static_cast<MOSDPGPull
*>(op
->get_nonconst_req());
930 assert(m
->get_type() == MSG_OSD_PG_PULL
);
931 pg_shard_t from
= m
->from
;
933 map
<pg_shard_t
, vector
<PushOp
> > replies
;
934 vector
<PullOp
> pulls
;
935 m
->take_pulls(&pulls
);
936 for (auto& i
: pulls
) {
937 replies
[from
].push_back(PushOp());
938 handle_pull(from
, i
, &(replies
[from
].back()));
940 send_pushes(m
->get_priority(), replies
);
943 void ReplicatedBackend::do_push_reply(OpRequestRef op
)
945 const MOSDPGPushReply
*m
= static_cast<const MOSDPGPushReply
*>(op
->get_req());
946 assert(m
->get_type() == MSG_OSD_PG_PUSH_REPLY
);
947 pg_shard_t from
= m
->from
;
949 vector
<PushOp
> replies(1);
950 for (vector
<PushReplyOp
>::const_iterator i
= m
->replies
.begin();
951 i
!= m
->replies
.end();
953 bool more
= handle_push_reply(from
, *i
, &(replies
.back()));
955 replies
.push_back(PushOp());
957 replies
.erase(replies
.end() - 1);
959 map
<pg_shard_t
, vector
<PushOp
> > _replies
;
960 _replies
[from
].swap(replies
);
961 send_pushes(m
->get_priority(), _replies
);
964 Message
* ReplicatedBackend::generate_subop(
965 const hobject_t
&soid
,
966 const eversion_t
&at_version
,
969 eversion_t pg_trim_to
,
970 eversion_t pg_roll_forward_to
,
971 hobject_t new_temp_oid
,
972 hobject_t discard_temp_oid
,
973 const vector
<pg_log_entry_t
> &log_entries
,
974 boost::optional
<pg_hit_set_history_t
> &hset_hist
,
975 ObjectStore::Transaction
&op_t
,
977 const pg_info_t
&pinfo
)
979 int acks_wanted
= CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
;
980 // forward the write/update/whatever
981 MOSDRepOp
*wr
= new MOSDRepOp(
982 reqid
, parent
->whoami_shard(),
983 spg_t(get_info().pgid
.pgid
, peer
.shard
),
985 get_osdmap()->get_epoch(),
986 parent
->get_last_peering_reset_epoch(),
989 // ship resulting transaction, log entries, and pg_stats
990 if (!parent
->should_send_op(peer
, soid
)) {
991 dout(10) << "issue_repop shipping empty opt to osd." << peer
992 <<", object " << soid
993 << " beyond MAX(last_backfill_started "
994 << ", pinfo.last_backfill "
995 << pinfo
.last_backfill
<< ")" << dendl
;
996 ObjectStore::Transaction t
;
997 ::encode(t
, wr
->get_data());
999 ::encode(op_t
, wr
->get_data());
1000 wr
->get_header().data_off
= op_t
.get_data_alignment();
1003 ::encode(log_entries
, wr
->logbl
);
1005 if (pinfo
.is_incomplete())
1006 wr
->pg_stats
= pinfo
.stats
; // reflects backfill progress
1008 wr
->pg_stats
= get_info().stats
;
1010 wr
->pg_trim_to
= pg_trim_to
;
1011 wr
->pg_roll_forward_to
= pg_roll_forward_to
;
1013 wr
->new_temp_oid
= new_temp_oid
;
1014 wr
->discard_temp_oid
= discard_temp_oid
;
1015 wr
->updated_hit_set_history
= hset_hist
;
1019 void ReplicatedBackend::issue_op(
1020 const hobject_t
&soid
,
1021 const eversion_t
&at_version
,
1024 eversion_t pg_trim_to
,
1025 eversion_t pg_roll_forward_to
,
1026 hobject_t new_temp_oid
,
1027 hobject_t discard_temp_oid
,
1028 const vector
<pg_log_entry_t
> &log_entries
,
1029 boost::optional
<pg_hit_set_history_t
> &hset_hist
,
1031 ObjectStore::Transaction
&op_t
)
1034 op
->op
->pg_trace
.event("issue replication ops");
1036 if (parent
->get_actingbackfill_shards().size() > 1) {
1038 set
<pg_shard_t
> replicas
= parent
->get_actingbackfill_shards();
1039 replicas
.erase(parent
->whoami_shard());
1040 ss
<< "waiting for subops from " << replicas
;
1042 op
->op
->mark_sub_op_sent(ss
.str());
1044 for (set
<pg_shard_t
>::const_iterator i
=
1045 parent
->get_actingbackfill_shards().begin();
1046 i
!= parent
->get_actingbackfill_shards().end();
1048 if (*i
== parent
->whoami_shard()) continue;
1049 pg_shard_t peer
= *i
;
1050 const pg_info_t
&pinfo
= parent
->get_shard_info().find(peer
)->second
;
1053 wr
= generate_subop(
1068 wr
->trace
.init("replicated op", nullptr, &op
->op
->pg_trace
);
1069 get_parent()->send_message_osd_cluster(
1070 peer
.osd
, wr
, get_osdmap()->get_epoch());
1075 void ReplicatedBackend::do_repop(OpRequestRef op
)
1077 static_cast<MOSDRepOp
*>(op
->get_nonconst_req())->finish_decode();
1078 const MOSDRepOp
*m
= static_cast<const MOSDRepOp
*>(op
->get_req());
1079 int msg_type
= m
->get_type();
1080 assert(MSG_OSD_REPOP
== msg_type
);
1082 const hobject_t
& soid
= m
->poid
;
1084 dout(10) << __func__
<< " " << soid
1085 << " v " << m
->version
1086 << (m
->logbl
.length() ? " (transaction)" : " (parallel exec")
1087 << " " << m
->logbl
.length()
1091 assert(m
->map_epoch
>= get_info().history
.same_interval_since
);
1093 // we better not be missing this.
1094 assert(!parent
->get_log().get_missing().is_missing(soid
));
1096 int ackerosd
= m
->get_source().num();
1100 RepModifyRef
rm(std::make_shared
<RepModify
>());
1102 rm
->ackerosd
= ackerosd
;
1103 rm
->last_complete
= get_info().last_complete
;
1104 rm
->epoch_started
= get_osdmap()->get_epoch();
1106 assert(m
->logbl
.length());
1107 // shipped transaction and log entries
1108 vector
<pg_log_entry_t
> log
;
1110 bufferlist::iterator p
= const_cast<bufferlist
&>(m
->get_data()).begin();
1111 ::decode(rm
->opt
, p
);
1113 if (m
->new_temp_oid
!= hobject_t()) {
1114 dout(20) << __func__
<< " start tracking temp " << m
->new_temp_oid
<< dendl
;
1115 add_temp_obj(m
->new_temp_oid
);
1117 if (m
->discard_temp_oid
!= hobject_t()) {
1118 dout(20) << __func__
<< " stop tracking temp " << m
->discard_temp_oid
<< dendl
;
1119 if (rm
->opt
.empty()) {
1120 dout(10) << __func__
<< ": removing object " << m
->discard_temp_oid
1121 << " since we won't get the transaction" << dendl
;
1122 rm
->localt
.remove(coll
, ghobject_t(m
->discard_temp_oid
));
1124 clear_temp_obj(m
->discard_temp_oid
);
1127 p
= const_cast<bufferlist
&>(m
->logbl
).begin();
1129 rm
->opt
.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
1131 bool update_snaps
= false;
1132 if (!rm
->opt
.empty()) {
1133 // If the opt is non-empty, we infer we are before
1134 // last_backfill (according to the primary, not our
1135 // not-quite-accurate value), and should update the
1136 // collections now. Otherwise, we do it later on push.
1137 update_snaps
= true;
1139 parent
->update_stats(m
->pg_stats
);
1140 parent
->log_operation(
1142 m
->updated_hit_set_history
,
1144 m
->pg_roll_forward_to
,
1148 rm
->opt
.register_on_commit(
1149 parent
->bless_context(
1150 new C_OSD_RepModifyCommit(this, rm
)));
1151 rm
->localt
.register_on_applied(
1152 parent
->bless_context(
1153 new C_OSD_RepModifyApply(this, rm
)));
1154 vector
<ObjectStore::Transaction
> tls
;
1156 tls
.push_back(std::move(rm
->localt
));
1157 tls
.push_back(std::move(rm
->opt
));
1158 parent
->queue_transactions(tls
, op
);
1159 // op is cleaned up by oncommit/onapply when both are executed
1162 void ReplicatedBackend::repop_applied(RepModifyRef rm
)
1164 rm
->op
->mark_event("sub_op_applied");
1166 rm
->op
->pg_trace
.event("sup_op_applied");
1168 dout(10) << __func__
<< " on " << rm
<< " op "
1169 << *rm
->op
->get_req() << dendl
;
1170 const Message
*m
= rm
->op
->get_req();
1171 const MOSDRepOp
*req
= static_cast<const MOSDRepOp
*>(m
);
1172 eversion_t version
= req
->version
;
1174 // send ack to acker only if we haven't sent a commit already
1175 if (!rm
->committed
) {
1176 Message
*ack
= new MOSDRepOpReply(
1177 req
, parent
->whoami_shard(),
1178 0, get_osdmap()->get_epoch(), req
->min_epoch
, CEPH_OSD_FLAG_ACK
);
1179 ack
->set_priority(CEPH_MSG_PRIO_HIGH
); // this better match commit priority!
1180 ack
->trace
= rm
->op
->pg_trace
;
1181 get_parent()->send_message_osd_cluster(
1182 rm
->ackerosd
, ack
, get_osdmap()->get_epoch());
1185 parent
->op_applied(version
);
1188 void ReplicatedBackend::repop_commit(RepModifyRef rm
)
1190 rm
->op
->mark_commit_sent();
1191 rm
->op
->pg_trace
.event("sup_op_commit");
1192 rm
->committed
= true;
1195 const MOSDRepOp
*m
= static_cast<const MOSDRepOp
*>(rm
->op
->get_req());
1196 assert(m
->get_type() == MSG_OSD_REPOP
);
1197 dout(10) << __func__
<< " on op " << *m
1198 << ", sending commit to osd." << rm
->ackerosd
1200 assert(get_osdmap()->is_up(rm
->ackerosd
));
1202 get_parent()->update_last_complete_ondisk(rm
->last_complete
);
1204 MOSDRepOpReply
*reply
= new MOSDRepOpReply(
1206 get_parent()->whoami_shard(),
1207 0, get_osdmap()->get_epoch(), m
->get_min_epoch(), CEPH_OSD_FLAG_ONDISK
);
1208 reply
->set_last_complete_ondisk(rm
->last_complete
);
1209 reply
->set_priority(CEPH_MSG_PRIO_HIGH
); // this better match ack priority!
1210 reply
->trace
= rm
->op
->pg_trace
;
1211 get_parent()->send_message_osd_cluster(
1212 rm
->ackerosd
, reply
, get_osdmap()->get_epoch());
1214 log_subop_stats(get_parent()->get_logger(), rm
->op
, l_osd_sop_w
);
1218 // ===========================================================
1220 void ReplicatedBackend::calc_head_subsets(
1221 ObjectContextRef obc
, SnapSet
& snapset
, const hobject_t
& head
,
1222 const pg_missing_t
& missing
,
1223 const hobject_t
&last_backfill
,
1224 interval_set
<uint64_t>& data_subset
,
1225 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1226 ObcLockManager
&manager
)
1228 dout(10) << "calc_head_subsets " << head
1229 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1231 uint64_t size
= obc
->obs
.oi
.size
;
1233 data_subset
.insert(0, size
);
1235 if (get_parent()->get_pool().allow_incomplete_clones()) {
1236 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1240 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1241 dout(10) << "calc_head_subsets " << head
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1246 interval_set
<uint64_t> cloning
;
1247 interval_set
<uint64_t> prev
;
1249 prev
.insert(0, size
);
1251 for (int j
=snapset
.clones
.size()-1; j
>=0; j
--) {
1253 c
.snap
= snapset
.clones
[j
];
1254 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1255 if (!missing
.is_missing(c
) &&
1256 c
< last_backfill
&&
1257 get_parent()->try_lock_for_read(c
, manager
)) {
1258 dout(10) << "calc_head_subsets " << head
<< " has prev " << c
1259 << " overlap " << prev
<< dendl
;
1260 clone_subsets
[c
] = prev
;
1261 cloning
.union_of(prev
);
1264 dout(10) << "calc_head_subsets " << head
<< " does not have prev " << c
1265 << " overlap " << prev
<< dendl
;
1269 if (cloning
.num_intervals() > cct
->_conf
->osd_recover_clone_overlap_limit
) {
1270 dout(10) << "skipping clone, too many holes" << dendl
;
1271 get_parent()->release_locks(manager
);
1272 clone_subsets
.clear();
1276 // what's left for us to push?
1277 data_subset
.subtract(cloning
);
1279 dout(10) << "calc_head_subsets " << head
1280 << " data_subset " << data_subset
1281 << " clone_subsets " << clone_subsets
<< dendl
;
1284 void ReplicatedBackend::calc_clone_subsets(
1285 SnapSet
& snapset
, const hobject_t
& soid
,
1286 const pg_missing_t
& missing
,
1287 const hobject_t
&last_backfill
,
1288 interval_set
<uint64_t>& data_subset
,
1289 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1290 ObcLockManager
&manager
)
1292 dout(10) << "calc_clone_subsets " << soid
1293 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1295 uint64_t size
= snapset
.clone_size
[soid
.snap
];
1297 data_subset
.insert(0, size
);
1299 if (get_parent()->get_pool().allow_incomplete_clones()) {
1300 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1304 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1305 dout(10) << "calc_clone_subsets " << soid
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1310 for (i
=0; i
< snapset
.clones
.size(); i
++)
1311 if (snapset
.clones
[i
] == soid
.snap
)
1314 // any overlap with next older clone?
1315 interval_set
<uint64_t> cloning
;
1316 interval_set
<uint64_t> prev
;
1318 prev
.insert(0, size
);
1319 for (int j
=i
-1; j
>=0; j
--) {
1321 c
.snap
= snapset
.clones
[j
];
1322 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1323 if (!missing
.is_missing(c
) &&
1324 c
< last_backfill
&&
1325 get_parent()->try_lock_for_read(c
, manager
)) {
1326 dout(10) << "calc_clone_subsets " << soid
<< " has prev " << c
1327 << " overlap " << prev
<< dendl
;
1328 clone_subsets
[c
] = prev
;
1329 cloning
.union_of(prev
);
1332 dout(10) << "calc_clone_subsets " << soid
<< " does not have prev " << c
1333 << " overlap " << prev
<< dendl
;
1336 // overlap with next newest?
1337 interval_set
<uint64_t> next
;
1339 next
.insert(0, size
);
1340 for (unsigned j
=i
+1; j
<snapset
.clones
.size(); j
++) {
1342 c
.snap
= snapset
.clones
[j
];
1343 next
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
-1]]);
1344 if (!missing
.is_missing(c
) &&
1345 c
< last_backfill
&&
1346 get_parent()->try_lock_for_read(c
, manager
)) {
1347 dout(10) << "calc_clone_subsets " << soid
<< " has next " << c
1348 << " overlap " << next
<< dendl
;
1349 clone_subsets
[c
] = next
;
1350 cloning
.union_of(next
);
1353 dout(10) << "calc_clone_subsets " << soid
<< " does not have next " << c
1354 << " overlap " << next
<< dendl
;
1357 if (cloning
.num_intervals() > cct
->_conf
->osd_recover_clone_overlap_limit
) {
1358 dout(10) << "skipping clone, too many holes" << dendl
;
1359 get_parent()->release_locks(manager
);
1360 clone_subsets
.clear();
1365 // what's left for us to push?
1366 data_subset
.subtract(cloning
);
1368 dout(10) << "calc_clone_subsets " << soid
1369 << " data_subset " << data_subset
1370 << " clone_subsets " << clone_subsets
<< dendl
;
1373 void ReplicatedBackend::prepare_pull(
1375 const hobject_t
& soid
,
1376 ObjectContextRef headctx
,
1379 assert(get_parent()->get_local_missing().get_items().count(soid
));
1380 eversion_t _v
= get_parent()->get_local_missing().get_items().find(
1383 const map
<hobject_t
, set
<pg_shard_t
>> &missing_loc(
1384 get_parent()->get_missing_loc_shards());
1385 const map
<pg_shard_t
, pg_missing_t
> &peer_missing(
1386 get_parent()->get_shard_missing());
1387 map
<hobject_t
, set
<pg_shard_t
>>::const_iterator q
= missing_loc
.find(soid
);
1388 assert(q
!= missing_loc
.end());
1389 assert(!q
->second
.empty());
1392 vector
<pg_shard_t
> shuffle(q
->second
.begin(), q
->second
.end());
1393 random_shuffle(shuffle
.begin(), shuffle
.end());
1394 vector
<pg_shard_t
>::iterator p
= shuffle
.begin();
1395 assert(get_osdmap()->is_up(p
->osd
));
1396 pg_shard_t fromshard
= *p
;
1398 dout(7) << "pull " << soid
1400 << " on osds " << q
->second
1401 << " from osd." << fromshard
1404 assert(peer_missing
.count(fromshard
));
1405 const pg_missing_t
&pmissing
= peer_missing
.find(fromshard
)->second
;
1406 if (pmissing
.is_missing(soid
, v
)) {
1407 assert(pmissing
.get_items().find(soid
)->second
.have
!= v
);
1408 dout(10) << "pulling soid " << soid
<< " from osd " << fromshard
1409 << " at version " << pmissing
.get_items().find(soid
)->second
.have
1410 << " rather than at version " << v
<< dendl
;
1411 v
= pmissing
.get_items().find(soid
)->second
.have
;
1412 assert(get_parent()->get_log().get_log().objects
.count(soid
) &&
1413 (get_parent()->get_log().get_log().objects
.find(soid
)->second
->op
==
1414 pg_log_entry_t::LOST_REVERT
) &&
1415 (get_parent()->get_log().get_log().objects
.find(
1416 soid
)->second
->reverting_to
==
1420 ObjectRecoveryInfo recovery_info
;
1421 ObcLockManager lock_manager
;
1423 if (soid
.is_snap()) {
1424 assert(!get_parent()->get_local_missing().is_missing(
1426 !get_parent()->get_local_missing().is_missing(
1427 soid
.get_snapdir()));
1430 SnapSetContext
*ssc
= headctx
->ssc
;
1432 dout(10) << " snapset " << ssc
->snapset
<< dendl
;
1433 recovery_info
.ss
= ssc
->snapset
;
1435 ssc
->snapset
, soid
, get_parent()->get_local_missing(),
1436 get_info().last_backfill
,
1437 recovery_info
.copy_subset
,
1438 recovery_info
.clone_subset
,
1440 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1441 dout(10) << " pulling " << recovery_info
<< dendl
;
1443 assert(ssc
->snapset
.clone_size
.count(soid
.snap
));
1444 recovery_info
.size
= ssc
->snapset
.clone_size
[soid
.snap
];
1446 // pulling head or unversioned object.
1447 // always pull the whole thing.
1448 recovery_info
.copy_subset
.insert(0, (uint64_t)-1);
1449 recovery_info
.size
= ((uint64_t)-1);
1452 h
->pulls
[fromshard
].push_back(PullOp());
1453 PullOp
&op
= h
->pulls
[fromshard
].back();
1456 op
.recovery_info
= recovery_info
;
1457 op
.recovery_info
.soid
= soid
;
1458 op
.recovery_info
.version
= v
;
1459 op
.recovery_progress
.data_complete
= false;
1460 op
.recovery_progress
.omap_complete
= false;
1461 op
.recovery_progress
.data_recovered_to
= 0;
1462 op
.recovery_progress
.first
= true;
1464 assert(!pulling
.count(soid
));
1465 pull_from_peer
[fromshard
].insert(soid
);
1466 PullInfo
&pi
= pulling
[soid
];
1467 pi
.from
= fromshard
;
1469 pi
.head_ctx
= headctx
;
1470 pi
.recovery_info
= op
.recovery_info
;
1471 pi
.recovery_progress
= op
.recovery_progress
;
1472 pi
.cache_dont_need
= h
->cache_dont_need
;
1473 pi
.lock_manager
= std::move(lock_manager
);
1477 * intelligently push an object to a replica. make use of existing
1478 * clones/heads and dup data ranges where possible.
1480 int ReplicatedBackend::prep_push_to_replica(
1481 ObjectContextRef obc
, const hobject_t
& soid
, pg_shard_t peer
,
1482 PushOp
*pop
, bool cache_dont_need
)
1484 const object_info_t
& oi
= obc
->obs
.oi
;
1485 uint64_t size
= obc
->obs
.oi
.size
;
1487 dout(10) << __func__
<< ": " << soid
<< " v" << oi
.version
1488 << " size " << size
<< " to osd." << peer
<< dendl
;
1490 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1491 interval_set
<uint64_t> data_subset
;
1493 ObcLockManager lock_manager
;
1494 // are we doing a clone on the replica?
1495 if (soid
.snap
&& soid
.snap
< CEPH_NOSNAP
) {
1496 hobject_t head
= soid
;
1497 head
.snap
= CEPH_NOSNAP
;
1499 // try to base push off of clones that succeed/preceed poid
1500 // we need the head (and current SnapSet) locally to do that.
1501 if (get_parent()->get_local_missing().is_missing(head
)) {
1502 dout(15) << "push_to_replica missing head " << head
<< ", pushing raw clone" << dendl
;
1503 return prep_push(obc
, soid
, peer
, pop
, cache_dont_need
);
1505 hobject_t snapdir
= head
;
1506 snapdir
.snap
= CEPH_SNAPDIR
;
1507 if (get_parent()->get_local_missing().is_missing(snapdir
)) {
1508 dout(15) << "push_to_replica missing snapdir " << snapdir
1509 << ", pushing raw clone" << dendl
;
1510 return prep_push(obc
, soid
, peer
, pop
, cache_dont_need
);
1513 SnapSetContext
*ssc
= obc
->ssc
;
1515 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1516 pop
->recovery_info
.ss
= ssc
->snapset
;
1517 map
<pg_shard_t
, pg_missing_t
>::const_iterator pm
=
1518 get_parent()->get_shard_missing().find(peer
);
1519 assert(pm
!= get_parent()->get_shard_missing().end());
1520 map
<pg_shard_t
, pg_info_t
>::const_iterator pi
=
1521 get_parent()->get_shard_info().find(peer
);
1522 assert(pi
!= get_parent()->get_shard_info().end());
1526 pi
->second
.last_backfill
,
1527 data_subset
, clone_subsets
,
1529 } else if (soid
.snap
== CEPH_NOSNAP
) {
1530 // pushing head or unversioned object.
1531 // base this on partially on replica's clones?
1532 SnapSetContext
*ssc
= obc
->ssc
;
1534 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1537 ssc
->snapset
, soid
, get_parent()->get_shard_missing().find(peer
)->second
,
1538 get_parent()->get_shard_info().find(peer
)->second
.last_backfill
,
1539 data_subset
, clone_subsets
,
1552 std::move(lock_manager
));
1555 int ReplicatedBackend::prep_push(ObjectContextRef obc
,
1556 const hobject_t
& soid
, pg_shard_t peer
,
1557 PushOp
*pop
, bool cache_dont_need
)
1559 interval_set
<uint64_t> data_subset
;
1560 if (obc
->obs
.oi
.size
)
1561 data_subset
.insert(0, obc
->obs
.oi
.size
);
1562 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1564 return prep_push(obc
, soid
, peer
,
1565 obc
->obs
.oi
.version
, data_subset
, clone_subsets
,
1566 pop
, cache_dont_need
, ObcLockManager());
1569 int ReplicatedBackend::prep_push(
1570 ObjectContextRef obc
,
1571 const hobject_t
& soid
, pg_shard_t peer
,
1573 interval_set
<uint64_t> &data_subset
,
1574 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1576 bool cache_dont_need
,
1577 ObcLockManager
&&lock_manager
)
1579 get_parent()->begin_peer_recover(peer
, soid
);
1581 PushInfo
&pi
= pushing
[soid
][peer
];
1583 pi
.recovery_info
.size
= obc
->obs
.oi
.size
;
1584 pi
.recovery_info
.copy_subset
= data_subset
;
1585 pi
.recovery_info
.clone_subset
= clone_subsets
;
1586 pi
.recovery_info
.soid
= soid
;
1587 pi
.recovery_info
.oi
= obc
->obs
.oi
;
1588 pi
.recovery_info
.ss
= pop
->recovery_info
.ss
;
1589 pi
.recovery_info
.version
= version
;
1590 pi
.lock_manager
= std::move(lock_manager
);
1592 ObjectRecoveryProgress new_progress
;
1593 int r
= build_push_op(pi
.recovery_info
,
1594 pi
.recovery_progress
,
1597 &(pi
.stat
), cache_dont_need
);
1600 pi
.recovery_progress
= new_progress
;
1604 void ReplicatedBackend::submit_push_data(
1605 const ObjectRecoveryInfo
&recovery_info
,
1608 bool cache_dont_need
,
1609 const interval_set
<uint64_t> &intervals_included
,
1610 bufferlist data_included
,
1611 bufferlist omap_header
,
1612 const map
<string
, bufferlist
> &attrs
,
1613 const map
<string
, bufferlist
> &omap_entries
,
1614 ObjectStore::Transaction
*t
)
1616 hobject_t target_oid
;
1617 if (first
&& complete
) {
1618 target_oid
= recovery_info
.soid
;
1620 target_oid
= get_parent()->get_temp_recovery_object(recovery_info
.soid
,
1621 recovery_info
.version
);
1623 dout(10) << __func__
<< ": Adding oid "
1624 << target_oid
<< " in the temp collection" << dendl
;
1625 add_temp_obj(target_oid
);
1630 t
->remove(coll
, ghobject_t(target_oid
));
1631 t
->touch(coll
, ghobject_t(target_oid
));
1632 t
->truncate(coll
, ghobject_t(target_oid
), recovery_info
.size
);
1633 if (omap_header
.length())
1634 t
->omap_setheader(coll
, ghobject_t(target_oid
), omap_header
);
1636 bufferlist bv
= attrs
.at(OI_ATTR
);
1637 object_info_t
oi(bv
);
1638 t
->set_alloc_hint(coll
, ghobject_t(target_oid
),
1639 oi
.expected_object_size
,
1640 oi
.expected_write_size
,
1641 oi
.alloc_hint_flags
);
1644 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
;
1645 if (cache_dont_need
)
1646 fadvise_flags
|= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
1647 for (interval_set
<uint64_t>::const_iterator p
= intervals_included
.begin();
1648 p
!= intervals_included
.end();
1651 bit
.substr_of(data_included
, off
, p
.get_len());
1652 t
->write(coll
, ghobject_t(target_oid
),
1653 p
.get_start(), p
.get_len(), bit
, fadvise_flags
);
1657 if (!omap_entries
.empty())
1658 t
->omap_setkeys(coll
, ghobject_t(target_oid
), omap_entries
);
1660 t
->setattrs(coll
, ghobject_t(target_oid
), attrs
);
1664 dout(10) << __func__
<< ": Removing oid "
1665 << target_oid
<< " from the temp collection" << dendl
;
1666 clear_temp_obj(target_oid
);
1667 t
->remove(coll
, ghobject_t(recovery_info
.soid
));
1668 t
->collection_move_rename(coll
, ghobject_t(target_oid
),
1669 coll
, ghobject_t(recovery_info
.soid
));
1672 submit_push_complete(recovery_info
, t
);
1676 void ReplicatedBackend::submit_push_complete(
1677 const ObjectRecoveryInfo
&recovery_info
,
1678 ObjectStore::Transaction
*t
)
1680 for (map
<hobject_t
, interval_set
<uint64_t>>::const_iterator p
=
1681 recovery_info
.clone_subset
.begin();
1682 p
!= recovery_info
.clone_subset
.end();
1684 for (interval_set
<uint64_t>::const_iterator q
= p
->second
.begin();
1685 q
!= p
->second
.end();
1687 dout(15) << " clone_range " << p
->first
<< " "
1688 << q
.get_start() << "~" << q
.get_len() << dendl
;
1689 t
->clone_range(coll
, ghobject_t(p
->first
), ghobject_t(recovery_info
.soid
),
1690 q
.get_start(), q
.get_len(), q
.get_start());
1695 ObjectRecoveryInfo
ReplicatedBackend::recalc_subsets(
1696 const ObjectRecoveryInfo
& recovery_info
,
1697 SnapSetContext
*ssc
,
1698 ObcLockManager
&manager
)
1700 if (!recovery_info
.soid
.snap
|| recovery_info
.soid
.snap
>= CEPH_NOSNAP
)
1701 return recovery_info
;
1702 ObjectRecoveryInfo new_info
= recovery_info
;
1703 new_info
.copy_subset
.clear();
1704 new_info
.clone_subset
.clear();
1706 get_parent()->release_locks(manager
); // might already have locks
1708 ssc
->snapset
, new_info
.soid
, get_parent()->get_local_missing(),
1709 get_info().last_backfill
,
1710 new_info
.copy_subset
, new_info
.clone_subset
,
1715 bool ReplicatedBackend::handle_pull_response(
1716 pg_shard_t from
, const PushOp
&pop
, PullOp
*response
,
1717 list
<pull_complete_info
> *to_continue
,
1718 ObjectStore::Transaction
*t
)
1720 interval_set
<uint64_t> data_included
= pop
.data_included
;
1723 dout(10) << "handle_pull_response "
1724 << pop
.recovery_info
1725 << pop
.after_progress
1726 << " data.size() is " << data
.length()
1727 << " data_included: " << data_included
1729 if (pop
.version
== eversion_t()) {
1730 // replica doesn't have it!
1731 _failed_pull(from
, pop
.soid
);
1735 const hobject_t
&hoid
= pop
.soid
;
1736 assert((data_included
.empty() && data
.length() == 0) ||
1737 (!data_included
.empty() && data
.length() > 0));
1739 auto piter
= pulling
.find(hoid
);
1740 if (piter
== pulling
.end()) {
1744 PullInfo
&pi
= piter
->second
;
1745 if (pi
.recovery_info
.size
== (uint64_t(-1))) {
1746 pi
.recovery_info
.size
= pop
.recovery_info
.size
;
1747 pi
.recovery_info
.copy_subset
.intersection_of(
1748 pop
.recovery_info
.copy_subset
);
1750 // If primary doesn't have object info and didn't know version
1751 if (pi
.recovery_info
.version
== eversion_t()) {
1752 pi
.recovery_info
.version
= pop
.version
;
1755 bool first
= pi
.recovery_progress
.first
;
1757 // attrs only reference the origin bufferlist (decode from
1758 // MOSDPGPush message) whose size is much greater than attrs in
1759 // recovery. If obc cache it (get_obc maybe cache the attr), this
1760 // causes the whole origin bufferlist would not be free until obc
1761 // is evicted from obc cache. So rebuild the bufferlists before
1763 auto attrset
= pop
.attrset
;
1764 for (auto& a
: attrset
) {
1767 pi
.obc
= get_parent()->get_obc(pi
.recovery_info
.soid
, attrset
);
1768 pi
.recovery_info
.oi
= pi
.obc
->obs
.oi
;
1769 pi
.recovery_info
= recalc_subsets(
1776 interval_set
<uint64_t> usable_intervals
;
1777 bufferlist usable_data
;
1778 trim_pushed_data(pi
.recovery_info
.copy_subset
,
1783 data_included
= usable_intervals
;
1784 data
.claim(usable_data
);
1787 pi
.recovery_progress
= pop
.after_progress
;
1789 dout(10) << "new recovery_info " << pi
.recovery_info
1790 << ", new progress " << pi
.recovery_progress
1793 bool complete
= pi
.is_complete();
1795 submit_push_data(pi
.recovery_info
, first
,
1796 complete
, pi
.cache_dont_need
,
1797 data_included
, data
,
1803 pi
.stat
.num_keys_recovered
+= pop
.omap_entries
.size();
1804 pi
.stat
.num_bytes_recovered
+= data
.length();
1807 pi
.stat
.num_objects_recovered
++;
1808 clear_pull_from(piter
);
1809 to_continue
->push_back({hoid
, pi
.stat
});
1810 get_parent()->on_local_recover(
1811 hoid
, pi
.recovery_info
, pi
.obc
, t
);
1814 response
->soid
= pop
.soid
;
1815 response
->recovery_info
= pi
.recovery_info
;
1816 response
->recovery_progress
= pi
.recovery_progress
;
1821 void ReplicatedBackend::handle_push(
1822 pg_shard_t from
, const PushOp
&pop
, PushReplyOp
*response
,
1823 ObjectStore::Transaction
*t
)
1825 dout(10) << "handle_push "
1826 << pop
.recovery_info
1827 << pop
.after_progress
1831 bool first
= pop
.before_progress
.first
;
1832 bool complete
= pop
.after_progress
.data_complete
&&
1833 pop
.after_progress
.omap_complete
;
1835 response
->soid
= pop
.recovery_info
.soid
;
1836 submit_push_data(pop
.recovery_info
,
1839 true, // must be replicate
1848 get_parent()->on_local_recover(
1849 pop
.recovery_info
.soid
,
1851 ObjectContextRef(), // ok, is replica
1855 void ReplicatedBackend::send_pushes(int prio
, map
<pg_shard_t
, vector
<PushOp
> > &pushes
)
1857 for (map
<pg_shard_t
, vector
<PushOp
> >::iterator i
= pushes
.begin();
1860 ConnectionRef con
= get_parent()->get_con_osd_cluster(
1862 get_osdmap()->get_epoch());
1865 vector
<PushOp
>::iterator j
= i
->second
.begin();
1866 while (j
!= i
->second
.end()) {
1868 uint64_t pushes
= 0;
1869 MOSDPGPush
*msg
= new MOSDPGPush();
1870 msg
->from
= get_parent()->whoami_shard();
1871 msg
->pgid
= get_parent()->primary_spg_t();
1872 msg
->map_epoch
= get_osdmap()->get_epoch();
1873 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
1874 msg
->set_priority(prio
);
1876 (j
!= i
->second
.end() &&
1877 cost
< cct
->_conf
->osd_max_push_cost
&&
1878 pushes
< cct
->_conf
->osd_max_push_objects
) ;
1880 dout(20) << __func__
<< ": sending push " << *j
1881 << " to osd." << i
->first
<< dendl
;
1882 cost
+= j
->cost(cct
);
1884 msg
->pushes
.push_back(*j
);
1886 msg
->set_cost(cost
);
1887 get_parent()->send_message_osd_cluster(msg
, con
);
1892 void ReplicatedBackend::send_pulls(int prio
, map
<pg_shard_t
, vector
<PullOp
> > &pulls
)
1894 for (map
<pg_shard_t
, vector
<PullOp
> >::iterator i
= pulls
.begin();
1897 ConnectionRef con
= get_parent()->get_con_osd_cluster(
1899 get_osdmap()->get_epoch());
1902 dout(20) << __func__
<< ": sending pulls " << i
->second
1903 << " to osd." << i
->first
<< dendl
;
1904 MOSDPGPull
*msg
= new MOSDPGPull();
1905 msg
->from
= parent
->whoami_shard();
1906 msg
->set_priority(prio
);
1907 msg
->pgid
= get_parent()->primary_spg_t();
1908 msg
->map_epoch
= get_osdmap()->get_epoch();
1909 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
1910 msg
->set_pulls(&i
->second
);
1911 msg
->compute_cost(cct
);
1912 get_parent()->send_message_osd_cluster(msg
, con
);
1916 int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo
&recovery_info
,
1917 const ObjectRecoveryProgress
&progress
,
1918 ObjectRecoveryProgress
*out_progress
,
1920 object_stat_sum_t
*stat
,
1921 bool cache_dont_need
)
1923 ObjectRecoveryProgress _new_progress
;
1925 out_progress
= &_new_progress
;
1926 ObjectRecoveryProgress
&new_progress
= *out_progress
;
1927 new_progress
= progress
;
1929 dout(7) << __func__
<< " " << recovery_info
.soid
1930 << " v " << recovery_info
.version
1931 << " size " << recovery_info
.size
1932 << " recovery_info: " << recovery_info
1935 eversion_t v
= recovery_info
.version
;
1936 if (progress
.first
) {
1937 int r
= store
->omap_get_header(coll
, ghobject_t(recovery_info
.soid
), &out_op
->omap_header
);
1939 dout(1) << __func__
<< " get omap header failed: " << cpp_strerror(-r
) << dendl
;
1942 r
= store
->getattrs(ch
, ghobject_t(recovery_info
.soid
), out_op
->attrset
);
1944 dout(1) << __func__
<< " getattrs failed: " << cpp_strerror(-r
) << dendl
;
1949 bufferlist bv
= out_op
->attrset
[OI_ATTR
];
1952 bufferlist::iterator bliter
= bv
.begin();
1953 ::decode(oi
, bliter
);
1955 dout(0) << __func__
<< ": bad object_info_t: " << recovery_info
.soid
<< dendl
;
1959 // If requestor didn't know the version, use ours
1960 if (v
== eversion_t()) {
1962 } else if (oi
.version
!= v
) {
1963 get_parent()->clog_error() << get_info().pgid
<< " push "
1964 << recovery_info
.soid
<< " v "
1965 << recovery_info
.version
1966 << " failed because local copy is "
1971 new_progress
.first
= false;
1973 // Once we provide the version subsequent requests will have it, so
1974 // at this point it must be known.
1975 assert(v
!= eversion_t());
1977 uint64_t available
= cct
->_conf
->osd_recovery_max_chunk
;
1978 if (!progress
.omap_complete
) {
1979 ObjectMap::ObjectMapIterator iter
=
1980 store
->get_omap_iterator(coll
,
1981 ghobject_t(recovery_info
.soid
));
1983 for (iter
->lower_bound(progress
.omap_recovered_to
);
1985 iter
->next(false)) {
1986 if (!out_op
->omap_entries
.empty() &&
1987 ((cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
> 0 &&
1988 out_op
->omap_entries
.size() >= cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
) ||
1989 available
<= iter
->key().size() + iter
->value().length()))
1991 out_op
->omap_entries
.insert(make_pair(iter
->key(), iter
->value()));
1993 if ((iter
->key().size() + iter
->value().length()) <= available
)
1994 available
-= (iter
->key().size() + iter
->value().length());
1999 new_progress
.omap_complete
= true;
2001 new_progress
.omap_recovered_to
= iter
->key();
2004 if (available
> 0) {
2005 if (!recovery_info
.copy_subset
.empty()) {
2006 interval_set
<uint64_t> copy_subset
= recovery_info
.copy_subset
;
2007 map
<uint64_t, uint64_t> m
;
2008 int r
= store
->fiemap(ch
, ghobject_t(recovery_info
.soid
), 0,
2009 copy_subset
.range_end(), m
);
2011 interval_set
<uint64_t> fiemap_included(m
);
2012 copy_subset
.intersection_of(fiemap_included
);
2014 // intersection of copy_subset and empty interval_set would be empty anyway
2015 copy_subset
.clear();
2018 out_op
->data_included
.span_of(copy_subset
, progress
.data_recovered_to
,
2020 if (out_op
->data_included
.empty()) // zero filled section, skip to end!
2021 new_progress
.data_recovered_to
= recovery_info
.copy_subset
.range_end();
2023 new_progress
.data_recovered_to
= out_op
->data_included
.range_end();
2026 out_op
->data_included
.clear();
2029 for (interval_set
<uint64_t>::iterator p
= out_op
->data_included
.begin();
2030 p
!= out_op
->data_included
.end();
2033 int r
= store
->read(ch
, ghobject_t(recovery_info
.soid
),
2034 p
.get_start(), p
.get_len(), bit
,
2035 cache_dont_need
? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
: 0);
2036 if (cct
->_conf
->osd_debug_random_push_read_error
&&
2037 (rand() % (int)(cct
->_conf
->osd_debug_random_push_read_error
* 100.0)) == 0) {
2038 dout(0) << __func__
<< ": inject EIO " << recovery_info
.soid
<< dendl
;
2044 if (p
.get_len() != bit
.length()) {
2045 dout(10) << " extent " << p
.get_start() << "~" << p
.get_len()
2046 << " is actually " << p
.get_start() << "~" << bit
.length()
2048 interval_set
<uint64_t>::iterator save
= p
++;
2049 if (bit
.length() == 0)
2050 out_op
->data_included
.erase(save
); //Remove this empty interval
2052 save
.set_len(bit
.length());
2053 // Remove any other intervals present
2054 while (p
!= out_op
->data_included
.end()) {
2055 interval_set
<uint64_t>::iterator save
= p
++;
2056 out_op
->data_included
.erase(save
);
2058 new_progress
.data_complete
= true;
2059 out_op
->data
.claim_append(bit
);
2062 out_op
->data
.claim_append(bit
);
2065 if (new_progress
.is_complete(recovery_info
)) {
2066 new_progress
.data_complete
= true;
2068 stat
->num_objects_recovered
++;
2072 stat
->num_keys_recovered
+= out_op
->omap_entries
.size();
2073 stat
->num_bytes_recovered
+= out_op
->data
.length();
2076 get_parent()->get_logger()->inc(l_osd_push
);
2077 get_parent()->get_logger()->inc(l_osd_push_outb
, out_op
->data
.length());
2080 out_op
->version
= v
;
2081 out_op
->soid
= recovery_info
.soid
;
2082 out_op
->recovery_info
= recovery_info
;
2083 out_op
->after_progress
= new_progress
;
2084 out_op
->before_progress
= progress
;
2088 void ReplicatedBackend::prep_push_op_blank(const hobject_t
& soid
, PushOp
*op
)
2090 op
->recovery_info
.version
= eversion_t();
2091 op
->version
= eversion_t();
2095 bool ReplicatedBackend::handle_push_reply(
2096 pg_shard_t peer
, const PushReplyOp
&op
, PushOp
*reply
)
2098 const hobject_t
&soid
= op
.soid
;
2099 if (pushing
.count(soid
) == 0) {
2100 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2101 << ", or anybody else"
2104 } else if (pushing
[soid
].count(peer
) == 0) {
2105 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2109 PushInfo
*pi
= &pushing
[soid
][peer
];
2110 bool error
= pushing
[soid
].begin()->second
.recovery_progress
.error
;
2112 if (!pi
->recovery_progress
.data_complete
&& !error
) {
2113 dout(10) << " pushing more from, "
2114 << pi
->recovery_progress
.data_recovered_to
2115 << " of " << pi
->recovery_info
.copy_subset
<< dendl
;
2116 ObjectRecoveryProgress new_progress
;
2117 int r
= build_push_op(
2119 pi
->recovery_progress
, &new_progress
, reply
,
2121 // Handle the case of a read error right after we wrote, which is
2122 // hopefuilly extremely rare.
2124 dout(5) << __func__
<< ": oid " << soid
<< " error " << r
<< dendl
;
2129 pi
->recovery_progress
= new_progress
;
2135 get_parent()->on_peer_recover( peer
, soid
, pi
->recovery_info
);
2137 get_parent()->release_locks(pi
->lock_manager
);
2138 object_stat_sum_t stat
= pi
->stat
;
2139 eversion_t v
= pi
->recovery_info
.version
;
2140 pushing
[soid
].erase(peer
);
2143 if (pushing
[soid
].empty()) {
2145 get_parent()->on_global_recover(soid
, stat
);
2147 get_parent()->on_primary_error(soid
, v
);
2149 pushing
.erase(soid
);
2151 // This looks weird, but we erased the current peer and need to remember
2152 // the error on any other one, while getting more acks.
2154 pushing
[soid
].begin()->second
.recovery_progress
.error
= true;
2155 dout(10) << "pushed " << soid
<< ", still waiting for push ack from "
2156 << pushing
[soid
].size() << " others" << dendl
;
2163 void ReplicatedBackend::handle_pull(pg_shard_t peer
, PullOp
&op
, PushOp
*reply
)
2165 const hobject_t
&soid
= op
.soid
;
2167 int r
= store
->stat(ch
, ghobject_t(soid
), &st
);
2169 get_parent()->clog_error() << get_info().pgid
<< " "
2170 << peer
<< " tried to pull " << soid
2171 << " but got " << cpp_strerror(-r
);
2172 prep_push_op_blank(soid
, reply
);
2174 ObjectRecoveryInfo
&recovery_info
= op
.recovery_info
;
2175 ObjectRecoveryProgress
&progress
= op
.recovery_progress
;
2176 if (progress
.first
&& recovery_info
.size
== ((uint64_t)-1)) {
2177 // Adjust size and copy_subset
2178 recovery_info
.size
= st
.st_size
;
2179 recovery_info
.copy_subset
.clear();
2181 recovery_info
.copy_subset
.insert(0, st
.st_size
);
2182 assert(recovery_info
.clone_subset
.empty());
2185 r
= build_push_op(recovery_info
, progress
, 0, reply
);
2187 prep_push_op_blank(soid
, reply
);
2192 * trim received data to remove what we don't want
2194 * @param copy_subset intervals we want
2195 * @param data_included intervals we got
2196 * @param data_recieved data we got
2197 * @param intervals_usable intervals we want to keep
2198 * @param data_usable matching data we want to keep
2200 void ReplicatedBackend::trim_pushed_data(
2201 const interval_set
<uint64_t> ©_subset
,
2202 const interval_set
<uint64_t> &intervals_received
,
2203 bufferlist data_received
,
2204 interval_set
<uint64_t> *intervals_usable
,
2205 bufferlist
*data_usable
)
2207 if (intervals_received
.subset_of(copy_subset
)) {
2208 *intervals_usable
= intervals_received
;
2209 *data_usable
= data_received
;
2213 intervals_usable
->intersection_of(copy_subset
,
2214 intervals_received
);
2217 for (interval_set
<uint64_t>::const_iterator p
= intervals_received
.begin();
2218 p
!= intervals_received
.end();
2220 interval_set
<uint64_t> x
;
2221 x
.insert(p
.get_start(), p
.get_len());
2222 x
.intersection_of(copy_subset
);
2223 for (interval_set
<uint64_t>::const_iterator q
= x
.begin();
2227 uint64_t data_off
= off
+ (q
.get_start() - p
.get_start());
2228 sub
.substr_of(data_received
, data_off
, q
.get_len());
2229 data_usable
->claim_append(sub
);
2235 void ReplicatedBackend::_failed_pull(pg_shard_t from
, const hobject_t
&soid
)
2237 dout(20) << __func__
<< ": " << soid
<< " from " << from
<< dendl
;
2238 list
<pg_shard_t
> fl
= { from
};
2239 get_parent()->failed_push(fl
, soid
);
2241 clear_pull(pulling
.find(soid
));
2244 void ReplicatedBackend::clear_pull_from(
2245 map
<hobject_t
, PullInfo
>::iterator piter
)
2247 auto from
= piter
->second
.from
;
2248 pull_from_peer
[from
].erase(piter
->second
.soid
);
2249 if (pull_from_peer
[from
].empty())
2250 pull_from_peer
.erase(from
);
2253 void ReplicatedBackend::clear_pull(
2254 map
<hobject_t
, PullInfo
>::iterator piter
,
2255 bool clear_pull_from_peer
)
2257 if (clear_pull_from_peer
) {
2258 clear_pull_from(piter
);
2260 get_parent()->release_locks(piter
->second
.lock_manager
);
2261 pulling
.erase(piter
);
2264 int ReplicatedBackend::start_pushes(
2265 const hobject_t
&soid
,
2266 ObjectContextRef obc
,
2269 list
< map
<pg_shard_t
, pg_missing_t
>::const_iterator
> shards
;
2271 dout(20) << __func__
<< " soid " << soid
<< dendl
;
2273 assert(get_parent()->get_actingbackfill_shards().size() > 0);
2274 for (set
<pg_shard_t
>::iterator i
=
2275 get_parent()->get_actingbackfill_shards().begin();
2276 i
!= get_parent()->get_actingbackfill_shards().end();
2278 if (*i
== get_parent()->whoami_shard()) continue;
2279 pg_shard_t peer
= *i
;
2280 map
<pg_shard_t
, pg_missing_t
>::const_iterator j
=
2281 get_parent()->get_shard_missing().find(peer
);
2282 assert(j
!= get_parent()->get_shard_missing().end());
2283 if (j
->second
.is_missing(soid
)) {
2284 shards
.push_back(j
);
2288 // If more than 1 read will occur ignore possible request to not cache
2289 bool cache
= shards
.size() == 1 ? h
->cache_dont_need
: false;
2291 for (auto j
: shards
) {
2292 pg_shard_t peer
= j
->first
;
2293 h
->pushes
[peer
].push_back(PushOp());
2294 int r
= prep_push_to_replica(obc
, soid
, peer
,
2295 &(h
->pushes
[peer
].back()), cache
);
2297 // Back out all failed reads
2298 for (auto k
: shards
) {
2299 pg_shard_t p
= k
->first
;
2300 dout(10) << __func__
<< " clean up peer " << p
<< dendl
;
2301 h
->pushes
[p
].pop_back();
2302 if (p
== peer
) break;
2307 return shards
.size();