1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "common/errno.h"
15 #include "ReplicatedBackend.h"
16 #include "messages/MOSDOp.h"
17 #include "messages/MOSDSubOp.h"
18 #include "messages/MOSDRepOp.h"
19 #include "messages/MOSDSubOpReply.h"
20 #include "messages/MOSDRepOpReply.h"
21 #include "messages/MOSDPGPush.h"
22 #include "messages/MOSDPGPull.h"
23 #include "messages/MOSDPGPushReply.h"
24 #include "common/EventTrace.h"
26 #define dout_context cct
27 #define dout_subsys ceph_subsys_osd
28 #define DOUT_PREFIX_ARGS this
30 #define dout_prefix _prefix(_dout, this)
31 static ostream
& _prefix(std::ostream
*_dout
, ReplicatedBackend
*pgb
) {
32 return *_dout
<< pgb
->get_parent()->gen_dbg_prefix();
36 class PG_SendMessageOnConn
: public Context
{
37 PGBackend::Listener
*pg
;
42 PGBackend::Listener
*pg
,
44 ConnectionRef conn
) : pg(pg
), reply(reply
), conn(conn
) {}
45 void finish(int) override
{
46 pg
->send_message_osd_cluster(reply
, conn
.get());
50 class PG_RecoveryQueueAsync
: public Context
{
51 PGBackend::Listener
*pg
;
52 unique_ptr
<GenContext
<ThreadPool::TPHandle
&>> c
;
54 PG_RecoveryQueueAsync(
55 PGBackend::Listener
*pg
,
56 GenContext
<ThreadPool::TPHandle
&> *c
) : pg(pg
), c(c
) {}
57 void finish(int) override
{
58 pg
->schedule_recovery_work(c
.release());
63 struct ReplicatedBackend::C_OSD_RepModifyApply
: public Context
{
64 ReplicatedBackend
*pg
;
66 C_OSD_RepModifyApply(ReplicatedBackend
*pg
, RepModifyRef r
)
68 void finish(int r
) override
{
69 pg
->repop_applied(rm
);
73 struct ReplicatedBackend::C_OSD_RepModifyCommit
: public Context
{
74 ReplicatedBackend
*pg
;
76 C_OSD_RepModifyCommit(ReplicatedBackend
*pg
, RepModifyRef r
)
78 void finish(int r
) override
{
83 static void log_subop_stats(
85 OpRequestRef op
, int subop
)
87 utime_t now
= ceph_clock_now();
88 utime_t latency
= now
;
89 latency
-= op
->get_req()->get_recv_stamp();
92 logger
->inc(l_osd_sop
);
93 logger
->tinc(l_osd_sop_lat
, latency
);
96 if (subop
!= l_osd_sop_pull
) {
97 uint64_t inb
= op
->get_req()->get_data().length();
98 logger
->inc(l_osd_sop_inb
, inb
);
99 if (subop
== l_osd_sop_w
) {
100 logger
->inc(l_osd_sop_w_inb
, inb
);
101 logger
->tinc(l_osd_sop_w_lat
, latency
);
102 } else if (subop
== l_osd_sop_push
) {
103 logger
->inc(l_osd_sop_push_inb
, inb
);
104 logger
->tinc(l_osd_sop_push_lat
, latency
);
106 assert("no support subop" == 0);
108 logger
->tinc(l_osd_sop_pull_lat
, latency
);
112 ReplicatedBackend::ReplicatedBackend(
113 PGBackend::Listener
*pg
,
115 ObjectStore::CollectionHandle
&c
,
118 PGBackend(cct
, pg
, store
, coll
, c
) {}
120 void ReplicatedBackend::run_recovery_op(
121 PGBackend::RecoveryHandle
*_h
,
124 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
125 send_pushes(priority
, h
->pushes
);
126 send_pulls(priority
, h
->pulls
);
127 send_recovery_deletes(priority
, h
->deletes
);
131 int ReplicatedBackend::recover_object(
132 const hobject_t
&hoid
,
134 ObjectContextRef head
,
135 ObjectContextRef obc
,
139 dout(10) << __func__
<< ": " << hoid
<< dendl
;
140 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
141 if (get_parent()->get_local_missing().is_missing(hoid
)) {
151 int started
= start_pushes(
156 pushing
[hoid
].clear();
163 void ReplicatedBackend::check_recovery_sources(const OSDMapRef
& osdmap
)
165 for(map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= pull_from_peer
.begin();
166 i
!= pull_from_peer
.end();
168 if (osdmap
->is_down(i
->first
.osd
)) {
169 dout(10) << "check_recovery_sources resetting pulls from osd." << i
->first
170 << ", osdmap has it marked down" << dendl
;
171 for (set
<hobject_t
>::iterator j
= i
->second
.begin();
172 j
!= i
->second
.end();
174 get_parent()->cancel_pull(*j
);
175 clear_pull(pulling
.find(*j
), false);
177 pull_from_peer
.erase(i
++);
184 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op
)
186 dout(10) << __func__
<< ": " << op
<< dendl
;
187 switch (op
->get_req()->get_type()) {
188 case MSG_OSD_PG_PULL
:
195 bool ReplicatedBackend::_handle_message(
199 dout(10) << __func__
<< ": " << op
<< dendl
;
200 switch (op
->get_req()->get_type()) {
201 case MSG_OSD_PG_PUSH
:
205 case MSG_OSD_PG_PULL
:
209 case MSG_OSD_PG_PUSH_REPLY
:
213 case MSG_OSD_SUBOP
: {
214 const MOSDSubOp
*m
= static_cast<const MOSDSubOp
*>(op
->get_req());
215 if (m
->ops
.size() == 0) {
221 case MSG_OSD_REPOP
: {
226 case MSG_OSD_REPOPREPLY
: {
237 void ReplicatedBackend::clear_recovery_state()
239 // clear pushing/pulling maps
240 for (auto &&i
: pushing
) {
241 for (auto &&j
: i
.second
) {
242 get_parent()->release_locks(j
.second
.lock_manager
);
247 for (auto &&i
: pulling
) {
248 get_parent()->release_locks(i
.second
.lock_manager
);
251 pull_from_peer
.clear();
254 void ReplicatedBackend::on_change()
256 dout(10) << __func__
<< dendl
;
257 for (map
<ceph_tid_t
, InProgressOp
>::iterator i
= in_progress_ops
.begin();
258 i
!= in_progress_ops
.end();
259 in_progress_ops
.erase(i
++)) {
260 if (i
->second
.on_commit
)
261 delete i
->second
.on_commit
;
262 if (i
->second
.on_applied
)
263 delete i
->second
.on_applied
;
265 clear_recovery_state();
268 void ReplicatedBackend::on_flushed()
272 int ReplicatedBackend::objects_read_sync(
273 const hobject_t
&hoid
,
279 return store
->read(ch
, ghobject_t(hoid
), off
, len
, *bl
, op_flags
);
282 struct AsyncReadCallback
: public GenContext
<ThreadPool::TPHandle
&> {
285 AsyncReadCallback(int r
, Context
*c
) : r(r
), c(c
) {}
286 void finish(ThreadPool::TPHandle
&) override
{
290 ~AsyncReadCallback() override
{
294 void ReplicatedBackend::objects_read_async(
295 const hobject_t
&hoid
,
296 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
297 pair
<bufferlist
*, Context
*> > > &to_read
,
298 Context
*on_complete
,
301 // There is no fast read implementation for replication backend yet
305 for (list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
306 pair
<bufferlist
*, Context
*> > >::const_iterator i
=
308 i
!= to_read
.end() && r
>= 0;
310 int _r
= store
->read(ch
, ghobject_t(hoid
), i
->first
.get
<0>(),
311 i
->first
.get
<1>(), *(i
->second
.first
),
313 if (i
->second
.second
) {
314 get_parent()->schedule_recovery_work(
315 get_parent()->bless_gencontext(
316 new AsyncReadCallback(_r
, i
->second
.second
)));
321 get_parent()->schedule_recovery_work(
322 get_parent()->bless_gencontext(
323 new AsyncReadCallback(r
, on_complete
)));
326 class C_OSD_OnOpCommit
: public Context
{
327 ReplicatedBackend
*pg
;
328 ReplicatedBackend::InProgressOp
*op
;
330 C_OSD_OnOpCommit(ReplicatedBackend
*pg
, ReplicatedBackend::InProgressOp
*op
)
332 void finish(int) override
{
337 class C_OSD_OnOpApplied
: public Context
{
338 ReplicatedBackend
*pg
;
339 ReplicatedBackend::InProgressOp
*op
;
341 C_OSD_OnOpApplied(ReplicatedBackend
*pg
, ReplicatedBackend::InProgressOp
*op
)
343 void finish(int) override
{
348 void generate_transaction(
349 PGTransactionUPtr
&pgt
,
351 bool legacy_log_entries
,
352 vector
<pg_log_entry_t
> &log_entries
,
353 ObjectStore::Transaction
*t
,
354 set
<hobject_t
> *added
,
355 set
<hobject_t
> *removed
)
361 for (auto &&le
: log_entries
) {
362 le
.mark_unrollbackable();
363 auto oiter
= pgt
->op_map
.find(le
.soid
);
364 if (oiter
!= pgt
->op_map
.end() && oiter
->second
.updated_snaps
) {
365 bufferlist
bl(oiter
->second
.updated_snaps
->second
.size() * 8 + 8);
366 ::encode(oiter
->second
.updated_snaps
->second
, bl
);
368 le
.snaps
.reassign_to_mempool(mempool::mempool_osd_pglog
);
372 pgt
->safe_create_traverse(
373 [&](pair
<const hobject_t
, PGTransaction::ObjectOperation
> &obj_op
) {
374 const hobject_t
&oid
= obj_op
.first
;
375 const ghobject_t goid
=
376 ghobject_t(oid
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
);
377 const PGTransaction::ObjectOperation
&op
= obj_op
.second
;
380 if (op
.is_fresh_object()) {
382 } else if (op
.is_delete()) {
383 removed
->insert(oid
);
387 if (op
.delete_first
) {
388 t
->remove(coll
, goid
);
393 [&](const PGTransaction::ObjectOperation::Init::None
&) {
395 [&](const PGTransaction::ObjectOperation::Init::Create
&op
) {
396 t
->touch(coll
, goid
);
398 [&](const PGTransaction::ObjectOperation::Init::Clone
&op
) {
402 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
405 [&](const PGTransaction::ObjectOperation::Init::Rename
&op
) {
406 assert(op
.source
.is_temp());
407 t
->collection_move_rename(
410 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
416 t
->truncate(coll
, goid
, op
.truncate
->first
);
417 if (op
.truncate
->first
!= op
.truncate
->second
)
418 t
->truncate(coll
, goid
, op
.truncate
->second
);
421 if (!op
.attr_updates
.empty()) {
422 map
<string
, bufferlist
> attrs
;
423 for (auto &&p
: op
.attr_updates
) {
425 attrs
[p
.first
] = *(p
.second
);
427 t
->rmattr(coll
, goid
, p
.first
);
429 t
->setattrs(coll
, goid
, attrs
);
433 t
->omap_clear(coll
, goid
);
435 t
->omap_setheader(coll
, goid
, *(op
.omap_header
));
437 for (auto &&up
: op
.omap_updates
) {
438 using UpdateType
= PGTransaction::ObjectOperation::OmapUpdateType
;
440 case UpdateType::Remove
:
441 t
->omap_rmkeys(coll
, goid
, up
.second
);
443 case UpdateType::Insert
:
444 t
->omap_setkeys(coll
, goid
, up
.second
);
449 // updated_snaps doesn't matter since we marked unrollbackable
452 auto &hint
= *(op
.alloc_hint
);
456 hint
.expected_object_size
,
457 hint
.expected_write_size
,
461 for (auto &&extent
: op
.buffer_updates
) {
462 using BufferUpdate
= PGTransaction::ObjectOperation::BufferUpdate
;
465 [&](const BufferUpdate::Write
&op
) {
473 [&](const BufferUpdate::Zero
&op
) {
480 [&](const BufferUpdate::CloneRange
&op
) {
481 assert(op
.len
== extent
.get_len());
484 ghobject_t(op
.from
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
494 void ReplicatedBackend::submit_transaction(
495 const hobject_t
&soid
,
496 const object_stat_sum_t
&delta_stats
,
497 const eversion_t
&at_version
,
498 PGTransactionUPtr
&&_t
,
499 const eversion_t
&trim_to
,
500 const eversion_t
&roll_forward_to
,
501 const vector
<pg_log_entry_t
> &_log_entries
,
502 boost::optional
<pg_hit_set_history_t
> &hset_history
,
503 Context
*on_local_applied_sync
,
504 Context
*on_all_acked
,
505 Context
*on_all_commit
,
508 OpRequestRef orig_op
)
514 vector
<pg_log_entry_t
> log_entries(_log_entries
);
515 ObjectStore::Transaction op_t
;
516 PGTransactionUPtr
t(std::move(_t
));
517 set
<hobject_t
> added
, removed
;
518 generate_transaction(
521 (get_osdmap()->require_osd_release
< CEPH_RELEASE_KRAKEN
),
526 assert(added
.size() <= 1);
527 assert(removed
.size() <= 1);
529 assert(!in_progress_ops
.count(tid
));
530 InProgressOp
&op
= in_progress_ops
.insert(
534 tid
, on_all_commit
, on_all_acked
,
539 op
.waiting_for_applied
.insert(
540 parent
->get_actingbackfill_shards().begin(),
541 parent
->get_actingbackfill_shards().end());
542 op
.waiting_for_commit
.insert(
543 parent
->get_actingbackfill_shards().begin(),
544 parent
->get_actingbackfill_shards().end());
553 added
.size() ? *(added
.begin()) : hobject_t(),
554 removed
.size() ? *(removed
.begin()) : hobject_t(),
560 add_temp_objs(added
);
561 clear_temp_objs(removed
);
563 parent
->log_operation(
571 op_t
.register_on_applied_sync(on_local_applied_sync
);
572 op_t
.register_on_applied(
573 parent
->bless_context(
574 new C_OSD_OnOpApplied(this, &op
)));
575 op_t
.register_on_commit(
576 parent
->bless_context(
577 new C_OSD_OnOpCommit(this, &op
)));
579 vector
<ObjectStore::Transaction
> tls
;
580 tls
.push_back(std::move(op_t
));
582 parent
->queue_transactions(tls
, op
.op
);
585 void ReplicatedBackend::op_applied(
589 OID_EVENT_TRACE_WITH_MSG((op
&& op
->op
) ? op
->op
->get_req() : NULL
, "OP_APPLIED_BEGIN", true);
590 dout(10) << __func__
<< ": " << op
->tid
<< dendl
;
592 op
->op
->mark_event("op_applied");
593 op
->op
->pg_trace
.event("op applied");
596 op
->waiting_for_applied
.erase(get_parent()->whoami_shard());
597 parent
->op_applied(op
->v
);
599 if (op
->waiting_for_applied
.empty()) {
600 op
->on_applied
->complete(0);
604 assert(!op
->on_commit
&& !op
->on_applied
);
605 in_progress_ops
.erase(op
->tid
);
609 void ReplicatedBackend::op_commit(
613 OID_EVENT_TRACE_WITH_MSG((op
&& op
->op
) ? op
->op
->get_req() : NULL
, "OP_COMMIT_BEGIN", true);
614 dout(10) << __func__
<< ": " << op
->tid
<< dendl
;
616 op
->op
->mark_event("op_commit");
617 op
->op
->pg_trace
.event("op commit");
620 op
->waiting_for_commit
.erase(get_parent()->whoami_shard());
622 if (op
->waiting_for_commit
.empty()) {
623 op
->on_commit
->complete(0);
627 assert(!op
->on_commit
&& !op
->on_applied
);
628 in_progress_ops
.erase(op
->tid
);
632 void ReplicatedBackend::do_repop_reply(OpRequestRef op
)
634 static_cast<MOSDRepOpReply
*>(op
->get_nonconst_req())->finish_decode();
635 const MOSDRepOpReply
*r
= static_cast<const MOSDRepOpReply
*>(op
->get_req());
636 assert(r
->get_header().type
== MSG_OSD_REPOPREPLY
);
640 // must be replication.
641 ceph_tid_t rep_tid
= r
->get_tid();
642 pg_shard_t from
= r
->from
;
644 if (in_progress_ops
.count(rep_tid
)) {
645 map
<ceph_tid_t
, InProgressOp
>::iterator iter
=
646 in_progress_ops
.find(rep_tid
);
647 InProgressOp
&ip_op
= iter
->second
;
648 const MOSDOp
*m
= NULL
;
650 m
= static_cast<const MOSDOp
*>(ip_op
.op
->get_req());
653 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " op " //<< *m
654 << " ack_type " << (int)r
->ack_type
658 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " (no op) "
659 << " ack_type " << (int)r
->ack_type
665 if (r
->ack_type
& CEPH_OSD_FLAG_ONDISK
) {
666 assert(ip_op
.waiting_for_commit
.count(from
));
667 ip_op
.waiting_for_commit
.erase(from
);
670 ss
<< "sub_op_commit_rec from " << from
;
671 ip_op
.op
->mark_event_string(ss
.str());
672 ip_op
.op
->pg_trace
.event("sub_op_commit_rec");
675 assert(ip_op
.waiting_for_applied
.count(from
));
678 ss
<< "sub_op_applied_rec from " << from
;
679 ip_op
.op
->mark_event_string(ss
.str());
680 ip_op
.op
->pg_trace
.event("sub_op_applied_rec");
683 ip_op
.waiting_for_applied
.erase(from
);
685 parent
->update_peer_last_complete_ondisk(
687 r
->get_last_complete_ondisk());
689 if (ip_op
.waiting_for_applied
.empty() &&
691 ip_op
.on_applied
->complete(0);
692 ip_op
.on_applied
= 0;
694 if (ip_op
.waiting_for_commit
.empty() &&
696 ip_op
.on_commit
->complete(0);
700 assert(!ip_op
.on_commit
&& !ip_op
.on_applied
);
701 in_progress_ops
.erase(iter
);
706 void ReplicatedBackend::be_deep_scrub(
707 const hobject_t
&poid
,
710 ThreadPool::TPHandle
&handle
)
712 dout(10) << __func__
<< " " << poid
<< " seed "
713 << std::hex
<< seed
<< std::dec
<< dendl
;
714 bufferhash
h(seed
), oh(seed
);
715 bufferlist bl
, hdrbl
;
719 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
| CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
722 handle
.reset_tp_timeout();
726 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
728 cct
->_conf
->osd_deep_scrub_stride
, bl
,
738 dout(25) << __func__
<< " " << poid
<< " got "
739 << r
<< " on read, read_error" << dendl
;
743 o
.digest
= h
.digest();
744 o
.digest_present
= true;
747 r
= store
->omap_get_header(
750 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
752 // NOTE: bobtail to giant, we would crc the head as (len, head).
753 // that changes at the same time we start using a non-zero seed.
754 if (r
== 0 && hdrbl
.length()) {
755 dout(25) << "CRC header " << string(hdrbl
.c_str(), hdrbl
.length())
765 } else if (r
== -EIO
) {
766 dout(25) << __func__
<< " " << poid
<< " got "
767 << r
<< " on omap header read, read_error" << dendl
;
772 ObjectMap::ObjectMapIterator iter
= store
->get_omap_iterator(
775 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
777 for (iter
->seek_to_first(); iter
->status() == 0 && iter
->valid();
779 handle
.reset_tp_timeout();
781 dout(25) << "CRC key " << iter
->key() << " value:\n";
782 iter
->value().hexdump(*_dout
);
785 ::encode(iter
->key(), bl
);
786 ::encode(iter
->value(), bl
);
791 if (iter
->status() < 0) {
792 dout(25) << __func__
<< " " << poid
793 << " on omap scan, db status error" << dendl
;
798 //Store final calculated CRC32 of omap header & key/values
799 o
.omap_digest
= oh
.digest();
800 o
.omap_digest_present
= true;
801 dout(20) << __func__
<< " " << poid
<< " omap_digest "
802 << std::hex
<< o
.omap_digest
<< std::dec
<< dendl
;
805 void ReplicatedBackend::_do_push(OpRequestRef op
)
807 const MOSDPGPush
*m
= static_cast<const MOSDPGPush
*>(op
->get_req());
808 assert(m
->get_type() == MSG_OSD_PG_PUSH
);
809 pg_shard_t from
= m
->from
;
813 vector
<PushReplyOp
> replies
;
814 ObjectStore::Transaction t
;
816 if (get_parent()->check_failsafe_full(ss
)) {
817 dout(10) << __func__
<< " Out of space (failsafe) processing push request: " << ss
.str() << dendl
;
820 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
821 i
!= m
->pushes
.end();
823 replies
.push_back(PushReplyOp());
824 handle_push(from
, *i
, &(replies
.back()), &t
);
827 MOSDPGPushReply
*reply
= new MOSDPGPushReply
;
828 reply
->from
= get_parent()->whoami_shard();
829 reply
->set_priority(m
->get_priority());
830 reply
->pgid
= get_info().pgid
;
831 reply
->map_epoch
= m
->map_epoch
;
832 reply
->min_epoch
= m
->min_epoch
;
833 reply
->replies
.swap(replies
);
834 reply
->compute_cost(cct
);
836 t
.register_on_complete(
837 new PG_SendMessageOnConn(
838 get_parent(), reply
, m
->get_connection()));
840 get_parent()->queue_transaction(std::move(t
));
843 struct C_ReplicatedBackend_OnPullComplete
: GenContext
<ThreadPool::TPHandle
&> {
844 ReplicatedBackend
*bc
;
845 list
<ReplicatedBackend::pull_complete_info
> to_continue
;
847 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend
*bc
, int priority
)
848 : bc(bc
), priority(priority
) {}
850 void finish(ThreadPool::TPHandle
&handle
) override
{
851 ReplicatedBackend::RPGHandle
*h
= bc
->_open_recovery_op();
852 for (auto &&i
: to_continue
) {
853 auto j
= bc
->pulling
.find(i
.hoid
);
854 assert(j
!= bc
->pulling
.end());
855 ObjectContextRef obc
= j
->second
.obc
;
856 bc
->clear_pull(j
, false /* already did it */);
857 int started
= bc
->start_pushes(i
.hoid
, obc
, h
);
859 bc
->pushing
[i
.hoid
].clear();
860 bc
->get_parent()->primary_failed(i
.hoid
);
861 bc
->get_parent()->primary_error(i
.hoid
, obc
->obs
.oi
.version
);
862 } else if (!started
) {
863 bc
->get_parent()->on_global_recover(
864 i
.hoid
, i
.stat
, false);
866 handle
.reset_tp_timeout();
868 bc
->run_recovery_op(h
, priority
);
872 void ReplicatedBackend::_do_pull_response(OpRequestRef op
)
874 const MOSDPGPush
*m
= static_cast<const MOSDPGPush
*>(op
->get_req());
875 assert(m
->get_type() == MSG_OSD_PG_PUSH
);
876 pg_shard_t from
= m
->from
;
880 vector
<PullOp
> replies(1);
883 if (get_parent()->check_failsafe_full(ss
)) {
884 dout(10) << __func__
<< " Out of space (failsafe) processing pull response (push): " << ss
.str() << dendl
;
888 ObjectStore::Transaction t
;
889 list
<pull_complete_info
> to_continue
;
890 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
891 i
!= m
->pushes
.end();
893 bool more
= handle_pull_response(from
, *i
, &(replies
.back()), &to_continue
, &t
);
895 replies
.push_back(PullOp());
897 if (!to_continue
.empty()) {
898 C_ReplicatedBackend_OnPullComplete
*c
=
899 new C_ReplicatedBackend_OnPullComplete(
902 c
->to_continue
.swap(to_continue
);
903 t
.register_on_complete(
904 new PG_RecoveryQueueAsync(
906 get_parent()->bless_gencontext(c
)));
908 replies
.erase(replies
.end() - 1);
910 if (replies
.size()) {
911 MOSDPGPull
*reply
= new MOSDPGPull
;
912 reply
->from
= parent
->whoami_shard();
913 reply
->set_priority(m
->get_priority());
914 reply
->pgid
= get_info().pgid
;
915 reply
->map_epoch
= m
->map_epoch
;
916 reply
->min_epoch
= m
->min_epoch
;
917 reply
->set_pulls(&replies
);
918 reply
->compute_cost(cct
);
920 t
.register_on_complete(
921 new PG_SendMessageOnConn(
922 get_parent(), reply
, m
->get_connection()));
925 get_parent()->queue_transaction(std::move(t
));
928 void ReplicatedBackend::do_pull(OpRequestRef op
)
930 MOSDPGPull
*m
= static_cast<MOSDPGPull
*>(op
->get_nonconst_req());
931 assert(m
->get_type() == MSG_OSD_PG_PULL
);
932 pg_shard_t from
= m
->from
;
934 map
<pg_shard_t
, vector
<PushOp
> > replies
;
935 vector
<PullOp
> pulls
;
936 m
->take_pulls(&pulls
);
937 for (auto& i
: pulls
) {
938 replies
[from
].push_back(PushOp());
939 handle_pull(from
, i
, &(replies
[from
].back()));
941 send_pushes(m
->get_priority(), replies
);
944 void ReplicatedBackend::do_push_reply(OpRequestRef op
)
946 const MOSDPGPushReply
*m
= static_cast<const MOSDPGPushReply
*>(op
->get_req());
947 assert(m
->get_type() == MSG_OSD_PG_PUSH_REPLY
);
948 pg_shard_t from
= m
->from
;
950 vector
<PushOp
> replies(1);
951 for (vector
<PushReplyOp
>::const_iterator i
= m
->replies
.begin();
952 i
!= m
->replies
.end();
954 bool more
= handle_push_reply(from
, *i
, &(replies
.back()));
956 replies
.push_back(PushOp());
958 replies
.erase(replies
.end() - 1);
960 map
<pg_shard_t
, vector
<PushOp
> > _replies
;
961 _replies
[from
].swap(replies
);
962 send_pushes(m
->get_priority(), _replies
);
965 Message
* ReplicatedBackend::generate_subop(
966 const hobject_t
&soid
,
967 const eversion_t
&at_version
,
970 eversion_t pg_trim_to
,
971 eversion_t pg_roll_forward_to
,
972 hobject_t new_temp_oid
,
973 hobject_t discard_temp_oid
,
974 const vector
<pg_log_entry_t
> &log_entries
,
975 boost::optional
<pg_hit_set_history_t
> &hset_hist
,
976 ObjectStore::Transaction
&op_t
,
978 const pg_info_t
&pinfo
)
980 int acks_wanted
= CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
;
981 // forward the write/update/whatever
982 MOSDRepOp
*wr
= new MOSDRepOp(
983 reqid
, parent
->whoami_shard(),
984 spg_t(get_info().pgid
.pgid
, peer
.shard
),
986 get_osdmap()->get_epoch(),
987 parent
->get_last_peering_reset_epoch(),
990 // ship resulting transaction, log entries, and pg_stats
991 if (!parent
->should_send_op(peer
, soid
)) {
992 dout(10) << "issue_repop shipping empty opt to osd." << peer
993 <<", object " << soid
994 << " beyond MAX(last_backfill_started "
995 << ", pinfo.last_backfill "
996 << pinfo
.last_backfill
<< ")" << dendl
;
997 ObjectStore::Transaction t
;
998 ::encode(t
, wr
->get_data());
1000 ::encode(op_t
, wr
->get_data());
1001 wr
->get_header().data_off
= op_t
.get_data_alignment();
1004 ::encode(log_entries
, wr
->logbl
);
1006 if (pinfo
.is_incomplete())
1007 wr
->pg_stats
= pinfo
.stats
; // reflects backfill progress
1009 wr
->pg_stats
= get_info().stats
;
1011 wr
->pg_trim_to
= pg_trim_to
;
1012 wr
->pg_roll_forward_to
= pg_roll_forward_to
;
1014 wr
->new_temp_oid
= new_temp_oid
;
1015 wr
->discard_temp_oid
= discard_temp_oid
;
1016 wr
->updated_hit_set_history
= hset_hist
;
1020 void ReplicatedBackend::issue_op(
1021 const hobject_t
&soid
,
1022 const eversion_t
&at_version
,
1025 eversion_t pg_trim_to
,
1026 eversion_t pg_roll_forward_to
,
1027 hobject_t new_temp_oid
,
1028 hobject_t discard_temp_oid
,
1029 const vector
<pg_log_entry_t
> &log_entries
,
1030 boost::optional
<pg_hit_set_history_t
> &hset_hist
,
1032 ObjectStore::Transaction
&op_t
)
1035 op
->op
->pg_trace
.event("issue replication ops");
1037 if (parent
->get_actingbackfill_shards().size() > 1) {
1039 set
<pg_shard_t
> replicas
= parent
->get_actingbackfill_shards();
1040 replicas
.erase(parent
->whoami_shard());
1041 ss
<< "waiting for subops from " << replicas
;
1043 op
->op
->mark_sub_op_sent(ss
.str());
1045 for (set
<pg_shard_t
>::const_iterator i
=
1046 parent
->get_actingbackfill_shards().begin();
1047 i
!= parent
->get_actingbackfill_shards().end();
1049 if (*i
== parent
->whoami_shard()) continue;
1050 pg_shard_t peer
= *i
;
1051 const pg_info_t
&pinfo
= parent
->get_shard_info().find(peer
)->second
;
1054 wr
= generate_subop(
1069 wr
->trace
.init("replicated op", nullptr, &op
->op
->pg_trace
);
1070 get_parent()->send_message_osd_cluster(
1071 peer
.osd
, wr
, get_osdmap()->get_epoch());
1076 void ReplicatedBackend::do_repop(OpRequestRef op
)
1078 static_cast<MOSDRepOp
*>(op
->get_nonconst_req())->finish_decode();
1079 const MOSDRepOp
*m
= static_cast<const MOSDRepOp
*>(op
->get_req());
1080 int msg_type
= m
->get_type();
1081 assert(MSG_OSD_REPOP
== msg_type
);
1083 const hobject_t
& soid
= m
->poid
;
1085 dout(10) << __func__
<< " " << soid
1086 << " v " << m
->version
1087 << (m
->logbl
.length() ? " (transaction)" : " (parallel exec")
1088 << " " << m
->logbl
.length()
1092 assert(m
->map_epoch
>= get_info().history
.same_interval_since
);
1094 // we better not be missing this.
1095 assert(!parent
->get_log().get_missing().is_missing(soid
));
1097 int ackerosd
= m
->get_source().num();
1101 RepModifyRef
rm(std::make_shared
<RepModify
>());
1103 rm
->ackerosd
= ackerosd
;
1104 rm
->last_complete
= get_info().last_complete
;
1105 rm
->epoch_started
= get_osdmap()->get_epoch();
1107 assert(m
->logbl
.length());
1108 // shipped transaction and log entries
1109 vector
<pg_log_entry_t
> log
;
1111 bufferlist::iterator p
= const_cast<bufferlist
&>(m
->get_data()).begin();
1112 ::decode(rm
->opt
, p
);
1114 if (m
->new_temp_oid
!= hobject_t()) {
1115 dout(20) << __func__
<< " start tracking temp " << m
->new_temp_oid
<< dendl
;
1116 add_temp_obj(m
->new_temp_oid
);
1118 if (m
->discard_temp_oid
!= hobject_t()) {
1119 dout(20) << __func__
<< " stop tracking temp " << m
->discard_temp_oid
<< dendl
;
1120 if (rm
->opt
.empty()) {
1121 dout(10) << __func__
<< ": removing object " << m
->discard_temp_oid
1122 << " since we won't get the transaction" << dendl
;
1123 rm
->localt
.remove(coll
, ghobject_t(m
->discard_temp_oid
));
1125 clear_temp_obj(m
->discard_temp_oid
);
1128 p
= const_cast<bufferlist
&>(m
->logbl
).begin();
1130 rm
->opt
.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
1132 bool update_snaps
= false;
1133 if (!rm
->opt
.empty()) {
1134 // If the opt is non-empty, we infer we are before
1135 // last_backfill (according to the primary, not our
1136 // not-quite-accurate value), and should update the
1137 // collections now. Otherwise, we do it later on push.
1138 update_snaps
= true;
1140 parent
->update_stats(m
->pg_stats
);
1141 parent
->log_operation(
1143 m
->updated_hit_set_history
,
1145 m
->pg_roll_forward_to
,
1149 rm
->opt
.register_on_commit(
1150 parent
->bless_context(
1151 new C_OSD_RepModifyCommit(this, rm
)));
1152 rm
->localt
.register_on_applied(
1153 parent
->bless_context(
1154 new C_OSD_RepModifyApply(this, rm
)));
1155 vector
<ObjectStore::Transaction
> tls
;
1157 tls
.push_back(std::move(rm
->localt
));
1158 tls
.push_back(std::move(rm
->opt
));
1159 parent
->queue_transactions(tls
, op
);
1160 // op is cleaned up by oncommit/onapply when both are executed
1163 void ReplicatedBackend::repop_applied(RepModifyRef rm
)
1165 rm
->op
->mark_event("sub_op_applied");
1167 rm
->op
->pg_trace
.event("sup_op_applied");
1169 dout(10) << __func__
<< " on " << rm
<< " op "
1170 << *rm
->op
->get_req() << dendl
;
1171 const Message
*m
= rm
->op
->get_req();
1172 const MOSDRepOp
*req
= static_cast<const MOSDRepOp
*>(m
);
1173 eversion_t version
= req
->version
;
1175 // send ack to acker only if we haven't sent a commit already
1176 if (!rm
->committed
) {
1177 Message
*ack
= new MOSDRepOpReply(
1178 req
, parent
->whoami_shard(),
1179 0, get_osdmap()->get_epoch(), req
->min_epoch
, CEPH_OSD_FLAG_ACK
);
1180 ack
->set_priority(CEPH_MSG_PRIO_HIGH
); // this better match commit priority!
1181 ack
->trace
= rm
->op
->pg_trace
;
1182 get_parent()->send_message_osd_cluster(
1183 rm
->ackerosd
, ack
, get_osdmap()->get_epoch());
1186 parent
->op_applied(version
);
1189 void ReplicatedBackend::repop_commit(RepModifyRef rm
)
1191 rm
->op
->mark_commit_sent();
1192 rm
->op
->pg_trace
.event("sup_op_commit");
1193 rm
->committed
= true;
1196 const MOSDRepOp
*m
= static_cast<const MOSDRepOp
*>(rm
->op
->get_req());
1197 assert(m
->get_type() == MSG_OSD_REPOP
);
1198 dout(10) << __func__
<< " on op " << *m
1199 << ", sending commit to osd." << rm
->ackerosd
1201 assert(get_osdmap()->is_up(rm
->ackerosd
));
1203 get_parent()->update_last_complete_ondisk(rm
->last_complete
);
1205 MOSDRepOpReply
*reply
= new MOSDRepOpReply(
1207 get_parent()->whoami_shard(),
1208 0, get_osdmap()->get_epoch(), m
->get_min_epoch(), CEPH_OSD_FLAG_ONDISK
);
1209 reply
->set_last_complete_ondisk(rm
->last_complete
);
1210 reply
->set_priority(CEPH_MSG_PRIO_HIGH
); // this better match ack priority!
1211 reply
->trace
= rm
->op
->pg_trace
;
1212 get_parent()->send_message_osd_cluster(
1213 rm
->ackerosd
, reply
, get_osdmap()->get_epoch());
1215 log_subop_stats(get_parent()->get_logger(), rm
->op
, l_osd_sop_w
);
1219 // ===========================================================
1221 void ReplicatedBackend::calc_head_subsets(
1222 ObjectContextRef obc
, SnapSet
& snapset
, const hobject_t
& head
,
1223 const pg_missing_t
& missing
,
1224 const hobject_t
&last_backfill
,
1225 interval_set
<uint64_t>& data_subset
,
1226 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1227 ObcLockManager
&manager
)
1229 dout(10) << "calc_head_subsets " << head
1230 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1232 uint64_t size
= obc
->obs
.oi
.size
;
1234 data_subset
.insert(0, size
);
1236 if (get_parent()->get_pool().allow_incomplete_clones()) {
1237 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1241 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1242 dout(10) << "calc_head_subsets " << head
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1247 interval_set
<uint64_t> cloning
;
1248 interval_set
<uint64_t> prev
;
1250 prev
.insert(0, size
);
1252 for (int j
=snapset
.clones
.size()-1; j
>=0; j
--) {
1254 c
.snap
= snapset
.clones
[j
];
1255 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1256 if (!missing
.is_missing(c
) &&
1257 c
< last_backfill
&&
1258 get_parent()->try_lock_for_read(c
, manager
)) {
1259 dout(10) << "calc_head_subsets " << head
<< " has prev " << c
1260 << " overlap " << prev
<< dendl
;
1261 clone_subsets
[c
] = prev
;
1262 cloning
.union_of(prev
);
1265 dout(10) << "calc_head_subsets " << head
<< " does not have prev " << c
1266 << " overlap " << prev
<< dendl
;
1270 if (cloning
.num_intervals() > cct
->_conf
->osd_recover_clone_overlap_limit
) {
1271 dout(10) << "skipping clone, too many holes" << dendl
;
1272 get_parent()->release_locks(manager
);
1273 clone_subsets
.clear();
1277 // what's left for us to push?
1278 data_subset
.subtract(cloning
);
1280 dout(10) << "calc_head_subsets " << head
1281 << " data_subset " << data_subset
1282 << " clone_subsets " << clone_subsets
<< dendl
;
1285 void ReplicatedBackend::calc_clone_subsets(
1286 SnapSet
& snapset
, const hobject_t
& soid
,
1287 const pg_missing_t
& missing
,
1288 const hobject_t
&last_backfill
,
1289 interval_set
<uint64_t>& data_subset
,
1290 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1291 ObcLockManager
&manager
)
1293 dout(10) << "calc_clone_subsets " << soid
1294 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1296 uint64_t size
= snapset
.clone_size
[soid
.snap
];
1298 data_subset
.insert(0, size
);
1300 if (get_parent()->get_pool().allow_incomplete_clones()) {
1301 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1305 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1306 dout(10) << "calc_clone_subsets " << soid
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1311 for (i
=0; i
< snapset
.clones
.size(); i
++)
1312 if (snapset
.clones
[i
] == soid
.snap
)
1315 // any overlap with next older clone?
1316 interval_set
<uint64_t> cloning
;
1317 interval_set
<uint64_t> prev
;
1319 prev
.insert(0, size
);
1320 for (int j
=i
-1; j
>=0; j
--) {
1322 c
.snap
= snapset
.clones
[j
];
1323 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1324 if (!missing
.is_missing(c
) &&
1325 c
< last_backfill
&&
1326 get_parent()->try_lock_for_read(c
, manager
)) {
1327 dout(10) << "calc_clone_subsets " << soid
<< " has prev " << c
1328 << " overlap " << prev
<< dendl
;
1329 clone_subsets
[c
] = prev
;
1330 cloning
.union_of(prev
);
1333 dout(10) << "calc_clone_subsets " << soid
<< " does not have prev " << c
1334 << " overlap " << prev
<< dendl
;
1337 // overlap with next newest?
1338 interval_set
<uint64_t> next
;
1340 next
.insert(0, size
);
1341 for (unsigned j
=i
+1; j
<snapset
.clones
.size(); j
++) {
1343 c
.snap
= snapset
.clones
[j
];
1344 next
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
-1]]);
1345 if (!missing
.is_missing(c
) &&
1346 c
< last_backfill
&&
1347 get_parent()->try_lock_for_read(c
, manager
)) {
1348 dout(10) << "calc_clone_subsets " << soid
<< " has next " << c
1349 << " overlap " << next
<< dendl
;
1350 clone_subsets
[c
] = next
;
1351 cloning
.union_of(next
);
1354 dout(10) << "calc_clone_subsets " << soid
<< " does not have next " << c
1355 << " overlap " << next
<< dendl
;
1358 if (cloning
.num_intervals() > cct
->_conf
->osd_recover_clone_overlap_limit
) {
1359 dout(10) << "skipping clone, too many holes" << dendl
;
1360 get_parent()->release_locks(manager
);
1361 clone_subsets
.clear();
1366 // what's left for us to push?
1367 data_subset
.subtract(cloning
);
1369 dout(10) << "calc_clone_subsets " << soid
1370 << " data_subset " << data_subset
1371 << " clone_subsets " << clone_subsets
<< dendl
;
1374 void ReplicatedBackend::prepare_pull(
1376 const hobject_t
& soid
,
1377 ObjectContextRef headctx
,
1380 assert(get_parent()->get_local_missing().get_items().count(soid
));
1381 eversion_t _v
= get_parent()->get_local_missing().get_items().find(
1384 const map
<hobject_t
, set
<pg_shard_t
>> &missing_loc(
1385 get_parent()->get_missing_loc_shards());
1386 const map
<pg_shard_t
, pg_missing_t
> &peer_missing(
1387 get_parent()->get_shard_missing());
1388 map
<hobject_t
, set
<pg_shard_t
>>::const_iterator q
= missing_loc
.find(soid
);
1389 assert(q
!= missing_loc
.end());
1390 assert(!q
->second
.empty());
1393 vector
<pg_shard_t
> shuffle(q
->second
.begin(), q
->second
.end());
1394 random_shuffle(shuffle
.begin(), shuffle
.end());
1395 vector
<pg_shard_t
>::iterator p
= shuffle
.begin();
1396 assert(get_osdmap()->is_up(p
->osd
));
1397 pg_shard_t fromshard
= *p
;
1399 dout(7) << "pull " << soid
1401 << " on osds " << q
->second
1402 << " from osd." << fromshard
1405 assert(peer_missing
.count(fromshard
));
1406 const pg_missing_t
&pmissing
= peer_missing
.find(fromshard
)->second
;
1407 if (pmissing
.is_missing(soid
, v
)) {
1408 assert(pmissing
.get_items().find(soid
)->second
.have
!= v
);
1409 dout(10) << "pulling soid " << soid
<< " from osd " << fromshard
1410 << " at version " << pmissing
.get_items().find(soid
)->second
.have
1411 << " rather than at version " << v
<< dendl
;
1412 v
= pmissing
.get_items().find(soid
)->second
.have
;
1413 assert(get_parent()->get_log().get_log().objects
.count(soid
) &&
1414 (get_parent()->get_log().get_log().objects
.find(soid
)->second
->op
==
1415 pg_log_entry_t::LOST_REVERT
) &&
1416 (get_parent()->get_log().get_log().objects
.find(
1417 soid
)->second
->reverting_to
==
1421 ObjectRecoveryInfo recovery_info
;
1422 ObcLockManager lock_manager
;
1424 if (soid
.is_snap()) {
1425 assert(!get_parent()->get_local_missing().is_missing(
1427 !get_parent()->get_local_missing().is_missing(
1428 soid
.get_snapdir()));
1431 SnapSetContext
*ssc
= headctx
->ssc
;
1433 dout(10) << " snapset " << ssc
->snapset
<< dendl
;
1434 recovery_info
.ss
= ssc
->snapset
;
1436 ssc
->snapset
, soid
, get_parent()->get_local_missing(),
1437 get_info().last_backfill
,
1438 recovery_info
.copy_subset
,
1439 recovery_info
.clone_subset
,
1441 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1442 dout(10) << " pulling " << recovery_info
<< dendl
;
1444 assert(ssc
->snapset
.clone_size
.count(soid
.snap
));
1445 recovery_info
.size
= ssc
->snapset
.clone_size
[soid
.snap
];
1447 // pulling head or unversioned object.
1448 // always pull the whole thing.
1449 recovery_info
.copy_subset
.insert(0, (uint64_t)-1);
1450 recovery_info
.size
= ((uint64_t)-1);
1453 h
->pulls
[fromshard
].push_back(PullOp());
1454 PullOp
&op
= h
->pulls
[fromshard
].back();
1457 op
.recovery_info
= recovery_info
;
1458 op
.recovery_info
.soid
= soid
;
1459 op
.recovery_info
.version
= v
;
1460 op
.recovery_progress
.data_complete
= false;
1461 op
.recovery_progress
.omap_complete
= false;
1462 op
.recovery_progress
.data_recovered_to
= 0;
1463 op
.recovery_progress
.first
= true;
1465 assert(!pulling
.count(soid
));
1466 pull_from_peer
[fromshard
].insert(soid
);
1467 PullInfo
&pi
= pulling
[soid
];
1468 pi
.from
= fromshard
;
1470 pi
.head_ctx
= headctx
;
1471 pi
.recovery_info
= op
.recovery_info
;
1472 pi
.recovery_progress
= op
.recovery_progress
;
1473 pi
.cache_dont_need
= h
->cache_dont_need
;
1474 pi
.lock_manager
= std::move(lock_manager
);
1478 * intelligently push an object to a replica. make use of existing
1479 * clones/heads and dup data ranges where possible.
1481 int ReplicatedBackend::prep_push_to_replica(
1482 ObjectContextRef obc
, const hobject_t
& soid
, pg_shard_t peer
,
1483 PushOp
*pop
, bool cache_dont_need
)
1485 const object_info_t
& oi
= obc
->obs
.oi
;
1486 uint64_t size
= obc
->obs
.oi
.size
;
1488 dout(10) << __func__
<< ": " << soid
<< " v" << oi
.version
1489 << " size " << size
<< " to osd." << peer
<< dendl
;
1491 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1492 interval_set
<uint64_t> data_subset
;
1494 ObcLockManager lock_manager
;
1495 // are we doing a clone on the replica?
1496 if (soid
.snap
&& soid
.snap
< CEPH_NOSNAP
) {
1497 hobject_t head
= soid
;
1498 head
.snap
= CEPH_NOSNAP
;
1500 // try to base push off of clones that succeed/preceed poid
1501 // we need the head (and current SnapSet) locally to do that.
1502 if (get_parent()->get_local_missing().is_missing(head
)) {
1503 dout(15) << "push_to_replica missing head " << head
<< ", pushing raw clone" << dendl
;
1504 return prep_push(obc
, soid
, peer
, pop
, cache_dont_need
);
1506 hobject_t snapdir
= head
;
1507 snapdir
.snap
= CEPH_SNAPDIR
;
1508 if (get_parent()->get_local_missing().is_missing(snapdir
)) {
1509 dout(15) << "push_to_replica missing snapdir " << snapdir
1510 << ", pushing raw clone" << dendl
;
1511 return prep_push(obc
, soid
, peer
, pop
, cache_dont_need
);
1514 SnapSetContext
*ssc
= obc
->ssc
;
1516 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1517 pop
->recovery_info
.ss
= ssc
->snapset
;
1518 map
<pg_shard_t
, pg_missing_t
>::const_iterator pm
=
1519 get_parent()->get_shard_missing().find(peer
);
1520 assert(pm
!= get_parent()->get_shard_missing().end());
1521 map
<pg_shard_t
, pg_info_t
>::const_iterator pi
=
1522 get_parent()->get_shard_info().find(peer
);
1523 assert(pi
!= get_parent()->get_shard_info().end());
1527 pi
->second
.last_backfill
,
1528 data_subset
, clone_subsets
,
1530 } else if (soid
.snap
== CEPH_NOSNAP
) {
1531 // pushing head or unversioned object.
1532 // base this on partially on replica's clones?
1533 SnapSetContext
*ssc
= obc
->ssc
;
1535 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1538 ssc
->snapset
, soid
, get_parent()->get_shard_missing().find(peer
)->second
,
1539 get_parent()->get_shard_info().find(peer
)->second
.last_backfill
,
1540 data_subset
, clone_subsets
,
1553 std::move(lock_manager
));
1556 int ReplicatedBackend::prep_push(ObjectContextRef obc
,
1557 const hobject_t
& soid
, pg_shard_t peer
,
1558 PushOp
*pop
, bool cache_dont_need
)
1560 interval_set
<uint64_t> data_subset
;
1561 if (obc
->obs
.oi
.size
)
1562 data_subset
.insert(0, obc
->obs
.oi
.size
);
1563 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1565 return prep_push(obc
, soid
, peer
,
1566 obc
->obs
.oi
.version
, data_subset
, clone_subsets
,
1567 pop
, cache_dont_need
, ObcLockManager());
1570 int ReplicatedBackend::prep_push(
1571 ObjectContextRef obc
,
1572 const hobject_t
& soid
, pg_shard_t peer
,
1574 interval_set
<uint64_t> &data_subset
,
1575 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1577 bool cache_dont_need
,
1578 ObcLockManager
&&lock_manager
)
1580 get_parent()->begin_peer_recover(peer
, soid
);
1582 PushInfo
&pi
= pushing
[soid
][peer
];
1584 pi
.recovery_info
.size
= obc
->obs
.oi
.size
;
1585 pi
.recovery_info
.copy_subset
= data_subset
;
1586 pi
.recovery_info
.clone_subset
= clone_subsets
;
1587 pi
.recovery_info
.soid
= soid
;
1588 pi
.recovery_info
.oi
= obc
->obs
.oi
;
1589 pi
.recovery_info
.ss
= pop
->recovery_info
.ss
;
1590 pi
.recovery_info
.version
= version
;
1591 pi
.lock_manager
= std::move(lock_manager
);
1593 ObjectRecoveryProgress new_progress
;
1594 int r
= build_push_op(pi
.recovery_info
,
1595 pi
.recovery_progress
,
1598 &(pi
.stat
), cache_dont_need
);
1601 pi
.recovery_progress
= new_progress
;
1605 void ReplicatedBackend::submit_push_data(
1606 const ObjectRecoveryInfo
&recovery_info
,
1609 bool cache_dont_need
,
1610 const interval_set
<uint64_t> &intervals_included
,
1611 bufferlist data_included
,
1612 bufferlist omap_header
,
1613 const map
<string
, bufferlist
> &attrs
,
1614 const map
<string
, bufferlist
> &omap_entries
,
1615 ObjectStore::Transaction
*t
)
1617 hobject_t target_oid
;
1618 if (first
&& complete
) {
1619 target_oid
= recovery_info
.soid
;
1621 target_oid
= get_parent()->get_temp_recovery_object(recovery_info
.soid
,
1622 recovery_info
.version
);
1624 dout(10) << __func__
<< ": Adding oid "
1625 << target_oid
<< " in the temp collection" << dendl
;
1626 add_temp_obj(target_oid
);
1631 t
->remove(coll
, ghobject_t(target_oid
));
1632 t
->touch(coll
, ghobject_t(target_oid
));
1633 t
->truncate(coll
, ghobject_t(target_oid
), recovery_info
.size
);
1634 if (omap_header
.length())
1635 t
->omap_setheader(coll
, ghobject_t(target_oid
), omap_header
);
1637 bufferlist bv
= attrs
.at(OI_ATTR
);
1638 object_info_t
oi(bv
);
1639 t
->set_alloc_hint(coll
, ghobject_t(target_oid
),
1640 oi
.expected_object_size
,
1641 oi
.expected_write_size
,
1642 oi
.alloc_hint_flags
);
1645 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
;
1646 if (cache_dont_need
)
1647 fadvise_flags
|= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
1648 for (interval_set
<uint64_t>::const_iterator p
= intervals_included
.begin();
1649 p
!= intervals_included
.end();
1652 bit
.substr_of(data_included
, off
, p
.get_len());
1653 t
->write(coll
, ghobject_t(target_oid
),
1654 p
.get_start(), p
.get_len(), bit
, fadvise_flags
);
1658 if (!omap_entries
.empty())
1659 t
->omap_setkeys(coll
, ghobject_t(target_oid
), omap_entries
);
1661 t
->setattrs(coll
, ghobject_t(target_oid
), attrs
);
1665 dout(10) << __func__
<< ": Removing oid "
1666 << target_oid
<< " from the temp collection" << dendl
;
1667 clear_temp_obj(target_oid
);
1668 t
->remove(coll
, ghobject_t(recovery_info
.soid
));
1669 t
->collection_move_rename(coll
, ghobject_t(target_oid
),
1670 coll
, ghobject_t(recovery_info
.soid
));
1673 submit_push_complete(recovery_info
, t
);
1677 void ReplicatedBackend::submit_push_complete(
1678 const ObjectRecoveryInfo
&recovery_info
,
1679 ObjectStore::Transaction
*t
)
1681 for (map
<hobject_t
, interval_set
<uint64_t>>::const_iterator p
=
1682 recovery_info
.clone_subset
.begin();
1683 p
!= recovery_info
.clone_subset
.end();
1685 for (interval_set
<uint64_t>::const_iterator q
= p
->second
.begin();
1686 q
!= p
->second
.end();
1688 dout(15) << " clone_range " << p
->first
<< " "
1689 << q
.get_start() << "~" << q
.get_len() << dendl
;
1690 t
->clone_range(coll
, ghobject_t(p
->first
), ghobject_t(recovery_info
.soid
),
1691 q
.get_start(), q
.get_len(), q
.get_start());
1696 ObjectRecoveryInfo
ReplicatedBackend::recalc_subsets(
1697 const ObjectRecoveryInfo
& recovery_info
,
1698 SnapSetContext
*ssc
,
1699 ObcLockManager
&manager
)
1701 if (!recovery_info
.soid
.snap
|| recovery_info
.soid
.snap
>= CEPH_NOSNAP
)
1702 return recovery_info
;
1703 ObjectRecoveryInfo new_info
= recovery_info
;
1704 new_info
.copy_subset
.clear();
1705 new_info
.clone_subset
.clear();
1707 get_parent()->release_locks(manager
); // might already have locks
1709 ssc
->snapset
, new_info
.soid
, get_parent()->get_local_missing(),
1710 get_info().last_backfill
,
1711 new_info
.copy_subset
, new_info
.clone_subset
,
1716 bool ReplicatedBackend::handle_pull_response(
1717 pg_shard_t from
, const PushOp
&pop
, PullOp
*response
,
1718 list
<pull_complete_info
> *to_continue
,
1719 ObjectStore::Transaction
*t
)
1721 interval_set
<uint64_t> data_included
= pop
.data_included
;
1724 dout(10) << "handle_pull_response "
1725 << pop
.recovery_info
1726 << pop
.after_progress
1727 << " data.size() is " << data
.length()
1728 << " data_included: " << data_included
1730 if (pop
.version
== eversion_t()) {
1731 // replica doesn't have it!
1732 _failed_pull(from
, pop
.soid
);
1736 const hobject_t
&hoid
= pop
.soid
;
1737 assert((data_included
.empty() && data
.length() == 0) ||
1738 (!data_included
.empty() && data
.length() > 0));
1740 auto piter
= pulling
.find(hoid
);
1741 if (piter
== pulling
.end()) {
1745 PullInfo
&pi
= piter
->second
;
1746 if (pi
.recovery_info
.size
== (uint64_t(-1))) {
1747 pi
.recovery_info
.size
= pop
.recovery_info
.size
;
1748 pi
.recovery_info
.copy_subset
.intersection_of(
1749 pop
.recovery_info
.copy_subset
);
1751 // If primary doesn't have object info and didn't know version
1752 if (pi
.recovery_info
.version
== eversion_t()) {
1753 pi
.recovery_info
.version
= pop
.version
;
1756 bool first
= pi
.recovery_progress
.first
;
1758 // attrs only reference the origin bufferlist (decode from
1759 // MOSDPGPush message) whose size is much greater than attrs in
1760 // recovery. If obc cache it (get_obc maybe cache the attr), this
1761 // causes the whole origin bufferlist would not be free until obc
1762 // is evicted from obc cache. So rebuild the bufferlists before
1764 auto attrset
= pop
.attrset
;
1765 for (auto& a
: attrset
) {
1768 pi
.obc
= get_parent()->get_obc(pi
.recovery_info
.soid
, attrset
);
1769 pi
.recovery_info
.oi
= pi
.obc
->obs
.oi
;
1770 pi
.recovery_info
= recalc_subsets(
1777 interval_set
<uint64_t> usable_intervals
;
1778 bufferlist usable_data
;
1779 trim_pushed_data(pi
.recovery_info
.copy_subset
,
1784 data_included
= usable_intervals
;
1785 data
.claim(usable_data
);
1788 pi
.recovery_progress
= pop
.after_progress
;
1790 dout(10) << "new recovery_info " << pi
.recovery_info
1791 << ", new progress " << pi
.recovery_progress
1794 bool complete
= pi
.is_complete();
1796 submit_push_data(pi
.recovery_info
, first
,
1797 complete
, pi
.cache_dont_need
,
1798 data_included
, data
,
1804 pi
.stat
.num_keys_recovered
+= pop
.omap_entries
.size();
1805 pi
.stat
.num_bytes_recovered
+= data
.length();
1808 pi
.stat
.num_objects_recovered
++;
1809 clear_pull_from(piter
);
1810 to_continue
->push_back({hoid
, pi
.stat
});
1811 get_parent()->on_local_recover(
1812 hoid
, pi
.recovery_info
, pi
.obc
, false, t
);
1815 response
->soid
= pop
.soid
;
1816 response
->recovery_info
= pi
.recovery_info
;
1817 response
->recovery_progress
= pi
.recovery_progress
;
1822 void ReplicatedBackend::handle_push(
1823 pg_shard_t from
, const PushOp
&pop
, PushReplyOp
*response
,
1824 ObjectStore::Transaction
*t
)
1826 dout(10) << "handle_push "
1827 << pop
.recovery_info
1828 << pop
.after_progress
1832 bool first
= pop
.before_progress
.first
;
1833 bool complete
= pop
.after_progress
.data_complete
&&
1834 pop
.after_progress
.omap_complete
;
1836 response
->soid
= pop
.recovery_info
.soid
;
1837 submit_push_data(pop
.recovery_info
,
1840 true, // must be replicate
1849 get_parent()->on_local_recover(
1850 pop
.recovery_info
.soid
,
1852 ObjectContextRef(), // ok, is replica
1857 void ReplicatedBackend::send_pushes(int prio
, map
<pg_shard_t
, vector
<PushOp
> > &pushes
)
1859 for (map
<pg_shard_t
, vector
<PushOp
> >::iterator i
= pushes
.begin();
1862 ConnectionRef con
= get_parent()->get_con_osd_cluster(
1864 get_osdmap()->get_epoch());
1867 vector
<PushOp
>::iterator j
= i
->second
.begin();
1868 while (j
!= i
->second
.end()) {
1870 uint64_t pushes
= 0;
1871 MOSDPGPush
*msg
= new MOSDPGPush();
1872 msg
->from
= get_parent()->whoami_shard();
1873 msg
->pgid
= get_parent()->primary_spg_t();
1874 msg
->map_epoch
= get_osdmap()->get_epoch();
1875 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
1876 msg
->set_priority(prio
);
1878 (j
!= i
->second
.end() &&
1879 cost
< cct
->_conf
->osd_max_push_cost
&&
1880 pushes
< cct
->_conf
->osd_max_push_objects
) ;
1882 dout(20) << __func__
<< ": sending push " << *j
1883 << " to osd." << i
->first
<< dendl
;
1884 cost
+= j
->cost(cct
);
1886 msg
->pushes
.push_back(*j
);
1888 msg
->set_cost(cost
);
1889 get_parent()->send_message_osd_cluster(msg
, con
);
1894 void ReplicatedBackend::send_pulls(int prio
, map
<pg_shard_t
, vector
<PullOp
> > &pulls
)
1896 for (map
<pg_shard_t
, vector
<PullOp
> >::iterator i
= pulls
.begin();
1899 ConnectionRef con
= get_parent()->get_con_osd_cluster(
1901 get_osdmap()->get_epoch());
1904 dout(20) << __func__
<< ": sending pulls " << i
->second
1905 << " to osd." << i
->first
<< dendl
;
1906 MOSDPGPull
*msg
= new MOSDPGPull();
1907 msg
->from
= parent
->whoami_shard();
1908 msg
->set_priority(prio
);
1909 msg
->pgid
= get_parent()->primary_spg_t();
1910 msg
->map_epoch
= get_osdmap()->get_epoch();
1911 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
1912 msg
->set_pulls(&i
->second
);
1913 msg
->compute_cost(cct
);
1914 get_parent()->send_message_osd_cluster(msg
, con
);
1918 int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo
&recovery_info
,
1919 const ObjectRecoveryProgress
&progress
,
1920 ObjectRecoveryProgress
*out_progress
,
1922 object_stat_sum_t
*stat
,
1923 bool cache_dont_need
)
1925 ObjectRecoveryProgress _new_progress
;
1927 out_progress
= &_new_progress
;
1928 ObjectRecoveryProgress
&new_progress
= *out_progress
;
1929 new_progress
= progress
;
1931 dout(7) << __func__
<< " " << recovery_info
.soid
1932 << " v " << recovery_info
.version
1933 << " size " << recovery_info
.size
1934 << " recovery_info: " << recovery_info
1937 eversion_t v
= recovery_info
.version
;
1938 if (progress
.first
) {
1939 int r
= store
->omap_get_header(coll
, ghobject_t(recovery_info
.soid
), &out_op
->omap_header
);
1941 dout(1) << __func__
<< " get omap header failed: " << cpp_strerror(-r
) << dendl
;
1944 r
= store
->getattrs(ch
, ghobject_t(recovery_info
.soid
), out_op
->attrset
);
1946 dout(1) << __func__
<< " getattrs failed: " << cpp_strerror(-r
) << dendl
;
1951 bufferlist bv
= out_op
->attrset
[OI_ATTR
];
1954 bufferlist::iterator bliter
= bv
.begin();
1955 ::decode(oi
, bliter
);
1957 dout(0) << __func__
<< ": bad object_info_t: " << recovery_info
.soid
<< dendl
;
1961 // If requestor didn't know the version, use ours
1962 if (v
== eversion_t()) {
1964 } else if (oi
.version
!= v
) {
1965 get_parent()->clog_error() << get_info().pgid
<< " push "
1966 << recovery_info
.soid
<< " v "
1967 << recovery_info
.version
1968 << " failed because local copy is "
1973 new_progress
.first
= false;
1975 // Once we provide the version subsequent requests will have it, so
1976 // at this point it must be known.
1977 assert(v
!= eversion_t());
1979 uint64_t available
= cct
->_conf
->osd_recovery_max_chunk
;
1980 if (!progress
.omap_complete
) {
1981 ObjectMap::ObjectMapIterator iter
=
1982 store
->get_omap_iterator(coll
,
1983 ghobject_t(recovery_info
.soid
));
1985 for (iter
->lower_bound(progress
.omap_recovered_to
);
1987 iter
->next(false)) {
1988 if (!out_op
->omap_entries
.empty() &&
1989 ((cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
> 0 &&
1990 out_op
->omap_entries
.size() >= cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
) ||
1991 available
<= iter
->key().size() + iter
->value().length()))
1993 out_op
->omap_entries
.insert(make_pair(iter
->key(), iter
->value()));
1995 if ((iter
->key().size() + iter
->value().length()) <= available
)
1996 available
-= (iter
->key().size() + iter
->value().length());
2001 new_progress
.omap_complete
= true;
2003 new_progress
.omap_recovered_to
= iter
->key();
2006 if (available
> 0) {
2007 if (!recovery_info
.copy_subset
.empty()) {
2008 interval_set
<uint64_t> copy_subset
= recovery_info
.copy_subset
;
2009 map
<uint64_t, uint64_t> m
;
2010 int r
= store
->fiemap(ch
, ghobject_t(recovery_info
.soid
), 0,
2011 copy_subset
.range_end(), m
);
2013 interval_set
<uint64_t> fiemap_included(m
);
2014 copy_subset
.intersection_of(fiemap_included
);
2016 // intersection of copy_subset and empty interval_set would be empty anyway
2017 copy_subset
.clear();
2020 out_op
->data_included
.span_of(copy_subset
, progress
.data_recovered_to
,
2022 if (out_op
->data_included
.empty()) // zero filled section, skip to end!
2023 new_progress
.data_recovered_to
= recovery_info
.copy_subset
.range_end();
2025 new_progress
.data_recovered_to
= out_op
->data_included
.range_end();
2028 out_op
->data_included
.clear();
2031 for (interval_set
<uint64_t>::iterator p
= out_op
->data_included
.begin();
2032 p
!= out_op
->data_included
.end();
2035 int r
= store
->read(ch
, ghobject_t(recovery_info
.soid
),
2036 p
.get_start(), p
.get_len(), bit
,
2037 cache_dont_need
? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
: 0);
2038 if (cct
->_conf
->osd_debug_random_push_read_error
&&
2039 (rand() % (int)(cct
->_conf
->osd_debug_random_push_read_error
* 100.0)) == 0) {
2040 dout(0) << __func__
<< ": inject EIO " << recovery_info
.soid
<< dendl
;
2046 if (p
.get_len() != bit
.length()) {
2047 dout(10) << " extent " << p
.get_start() << "~" << p
.get_len()
2048 << " is actually " << p
.get_start() << "~" << bit
.length()
2050 interval_set
<uint64_t>::iterator save
= p
++;
2051 if (bit
.length() == 0)
2052 out_op
->data_included
.erase(save
); //Remove this empty interval
2054 save
.set_len(bit
.length());
2055 // Remove any other intervals present
2056 while (p
!= out_op
->data_included
.end()) {
2057 interval_set
<uint64_t>::iterator save
= p
++;
2058 out_op
->data_included
.erase(save
);
2060 new_progress
.data_complete
= true;
2061 out_op
->data
.claim_append(bit
);
2064 out_op
->data
.claim_append(bit
);
2067 if (new_progress
.is_complete(recovery_info
)) {
2068 new_progress
.data_complete
= true;
2070 stat
->num_objects_recovered
++;
2074 stat
->num_keys_recovered
+= out_op
->omap_entries
.size();
2075 stat
->num_bytes_recovered
+= out_op
->data
.length();
2078 get_parent()->get_logger()->inc(l_osd_push
);
2079 get_parent()->get_logger()->inc(l_osd_push_outb
, out_op
->data
.length());
2082 out_op
->version
= v
;
2083 out_op
->soid
= recovery_info
.soid
;
2084 out_op
->recovery_info
= recovery_info
;
2085 out_op
->after_progress
= new_progress
;
2086 out_op
->before_progress
= progress
;
2090 void ReplicatedBackend::prep_push_op_blank(const hobject_t
& soid
, PushOp
*op
)
2092 op
->recovery_info
.version
= eversion_t();
2093 op
->version
= eversion_t();
2097 bool ReplicatedBackend::handle_push_reply(
2098 pg_shard_t peer
, const PushReplyOp
&op
, PushOp
*reply
)
2100 const hobject_t
&soid
= op
.soid
;
2101 if (pushing
.count(soid
) == 0) {
2102 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2103 << ", or anybody else"
2106 } else if (pushing
[soid
].count(peer
) == 0) {
2107 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2111 PushInfo
*pi
= &pushing
[soid
][peer
];
2112 bool error
= pushing
[soid
].begin()->second
.recovery_progress
.error
;
2114 if (!pi
->recovery_progress
.data_complete
&& !error
) {
2115 dout(10) << " pushing more from, "
2116 << pi
->recovery_progress
.data_recovered_to
2117 << " of " << pi
->recovery_info
.copy_subset
<< dendl
;
2118 ObjectRecoveryProgress new_progress
;
2119 int r
= build_push_op(
2121 pi
->recovery_progress
, &new_progress
, reply
,
2123 // Handle the case of a read error right after we wrote, which is
2124 // hopefuilly extremely rare.
2126 dout(5) << __func__
<< ": oid " << soid
<< " error " << r
<< dendl
;
2131 pi
->recovery_progress
= new_progress
;
2137 get_parent()->on_peer_recover( peer
, soid
, pi
->recovery_info
);
2139 get_parent()->release_locks(pi
->lock_manager
);
2140 object_stat_sum_t stat
= pi
->stat
;
2141 eversion_t v
= pi
->recovery_info
.version
;
2142 pushing
[soid
].erase(peer
);
2145 if (pushing
[soid
].empty()) {
2147 get_parent()->on_global_recover(soid
, stat
, false);
2149 get_parent()->on_primary_error(soid
, v
);
2150 pushing
.erase(soid
);
2152 // This looks weird, but we erased the current peer and need to remember
2153 // the error on any other one, while getting more acks.
2155 pushing
[soid
].begin()->second
.recovery_progress
.error
= true;
2156 dout(10) << "pushed " << soid
<< ", still waiting for push ack from "
2157 << pushing
[soid
].size() << " others" << dendl
;
2164 void ReplicatedBackend::handle_pull(pg_shard_t peer
, PullOp
&op
, PushOp
*reply
)
2166 const hobject_t
&soid
= op
.soid
;
2168 int r
= store
->stat(ch
, ghobject_t(soid
), &st
);
2170 get_parent()->clog_error() << get_info().pgid
<< " "
2171 << peer
<< " tried to pull " << soid
2172 << " but got " << cpp_strerror(-r
);
2173 prep_push_op_blank(soid
, reply
);
2175 ObjectRecoveryInfo
&recovery_info
= op
.recovery_info
;
2176 ObjectRecoveryProgress
&progress
= op
.recovery_progress
;
2177 if (progress
.first
&& recovery_info
.size
== ((uint64_t)-1)) {
2178 // Adjust size and copy_subset
2179 recovery_info
.size
= st
.st_size
;
2180 recovery_info
.copy_subset
.clear();
2182 recovery_info
.copy_subset
.insert(0, st
.st_size
);
2183 assert(recovery_info
.clone_subset
.empty());
2186 r
= build_push_op(recovery_info
, progress
, 0, reply
);
2188 prep_push_op_blank(soid
, reply
);
2193 * trim received data to remove what we don't want
2195 * @param copy_subset intervals we want
2196 * @param data_included intervals we got
2197 * @param data_recieved data we got
2198 * @param intervals_usable intervals we want to keep
2199 * @param data_usable matching data we want to keep
2201 void ReplicatedBackend::trim_pushed_data(
2202 const interval_set
<uint64_t> ©_subset
,
2203 const interval_set
<uint64_t> &intervals_received
,
2204 bufferlist data_received
,
2205 interval_set
<uint64_t> *intervals_usable
,
2206 bufferlist
*data_usable
)
2208 if (intervals_received
.subset_of(copy_subset
)) {
2209 *intervals_usable
= intervals_received
;
2210 *data_usable
= data_received
;
2214 intervals_usable
->intersection_of(copy_subset
,
2215 intervals_received
);
2218 for (interval_set
<uint64_t>::const_iterator p
= intervals_received
.begin();
2219 p
!= intervals_received
.end();
2221 interval_set
<uint64_t> x
;
2222 x
.insert(p
.get_start(), p
.get_len());
2223 x
.intersection_of(copy_subset
);
2224 for (interval_set
<uint64_t>::const_iterator q
= x
.begin();
2228 uint64_t data_off
= off
+ (q
.get_start() - p
.get_start());
2229 sub
.substr_of(data_received
, data_off
, q
.get_len());
2230 data_usable
->claim_append(sub
);
2236 void ReplicatedBackend::_failed_pull(pg_shard_t from
, const hobject_t
&soid
)
2238 dout(20) << __func__
<< ": " << soid
<< " from " << from
<< dendl
;
2239 list
<pg_shard_t
> fl
= { from
};
2240 get_parent()->failed_push(fl
, soid
);
2242 clear_pull(pulling
.find(soid
));
2245 void ReplicatedBackend::clear_pull_from(
2246 map
<hobject_t
, PullInfo
>::iterator piter
)
2248 auto from
= piter
->second
.from
;
2249 pull_from_peer
[from
].erase(piter
->second
.soid
);
2250 if (pull_from_peer
[from
].empty())
2251 pull_from_peer
.erase(from
);
2254 void ReplicatedBackend::clear_pull(
2255 map
<hobject_t
, PullInfo
>::iterator piter
,
2256 bool clear_pull_from_peer
)
2258 if (clear_pull_from_peer
) {
2259 clear_pull_from(piter
);
2261 get_parent()->release_locks(piter
->second
.lock_manager
);
2262 pulling
.erase(piter
);
2265 int ReplicatedBackend::start_pushes(
2266 const hobject_t
&soid
,
2267 ObjectContextRef obc
,
2270 list
< map
<pg_shard_t
, pg_missing_t
>::const_iterator
> shards
;
2272 dout(20) << __func__
<< " soid " << soid
<< dendl
;
2274 assert(get_parent()->get_actingbackfill_shards().size() > 0);
2275 for (set
<pg_shard_t
>::iterator i
=
2276 get_parent()->get_actingbackfill_shards().begin();
2277 i
!= get_parent()->get_actingbackfill_shards().end();
2279 if (*i
== get_parent()->whoami_shard()) continue;
2280 pg_shard_t peer
= *i
;
2281 map
<pg_shard_t
, pg_missing_t
>::const_iterator j
=
2282 get_parent()->get_shard_missing().find(peer
);
2283 assert(j
!= get_parent()->get_shard_missing().end());
2284 if (j
->second
.is_missing(soid
)) {
2285 shards
.push_back(j
);
2289 // If more than 1 read will occur ignore possible request to not cache
2290 bool cache
= shards
.size() == 1 ? h
->cache_dont_need
: false;
2292 for (auto j
: shards
) {
2293 pg_shard_t peer
= j
->first
;
2294 h
->pushes
[peer
].push_back(PushOp());
2295 int r
= prep_push_to_replica(obc
, soid
, peer
,
2296 &(h
->pushes
[peer
].back()), cache
);
2298 // Back out all failed reads
2299 for (auto k
: shards
) {
2300 pg_shard_t p
= k
->first
;
2301 dout(10) << __func__
<< " clean up peer " << p
<< dendl
;
2302 h
->pushes
[p
].pop_back();
2303 if (p
== peer
) break;
2308 return shards
.size();