1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "common/errno.h"
15 #include "ReplicatedBackend.h"
16 #include "messages/MOSDOp.h"
17 #include "messages/MOSDSubOp.h"
18 #include "messages/MOSDRepOp.h"
19 #include "messages/MOSDSubOpReply.h"
20 #include "messages/MOSDRepOpReply.h"
21 #include "messages/MOSDPGPush.h"
22 #include "messages/MOSDPGPull.h"
23 #include "messages/MOSDPGPushReply.h"
24 #include "common/EventTrace.h"
26 #define dout_context cct
27 #define dout_subsys ceph_subsys_osd
28 #define DOUT_PREFIX_ARGS this
30 #define dout_prefix _prefix(_dout, this)
31 static ostream
& _prefix(std::ostream
*_dout
, ReplicatedBackend
*pgb
) {
32 return *_dout
<< pgb
->get_parent()->gen_dbg_prefix();
36 class PG_SendMessageOnConn
: public Context
{
37 PGBackend::Listener
*pg
;
42 PGBackend::Listener
*pg
,
44 ConnectionRef conn
) : pg(pg
), reply(reply
), conn(conn
) {}
45 void finish(int) override
{
46 pg
->send_message_osd_cluster(reply
, conn
.get());
50 class PG_RecoveryQueueAsync
: public Context
{
51 PGBackend::Listener
*pg
;
52 unique_ptr
<GenContext
<ThreadPool::TPHandle
&>> c
;
54 PG_RecoveryQueueAsync(
55 PGBackend::Listener
*pg
,
56 GenContext
<ThreadPool::TPHandle
&> *c
) : pg(pg
), c(c
) {}
57 void finish(int) override
{
58 pg
->schedule_recovery_work(c
.release());
63 struct ReplicatedBackend::C_OSD_RepModifyApply
: public Context
{
64 ReplicatedBackend
*pg
;
66 C_OSD_RepModifyApply(ReplicatedBackend
*pg
, RepModifyRef r
)
68 void finish(int r
) override
{
69 pg
->repop_applied(rm
);
73 struct ReplicatedBackend::C_OSD_RepModifyCommit
: public Context
{
74 ReplicatedBackend
*pg
;
76 C_OSD_RepModifyCommit(ReplicatedBackend
*pg
, RepModifyRef r
)
78 void finish(int r
) override
{
83 static void log_subop_stats(
85 OpRequestRef op
, int subop
)
87 utime_t now
= ceph_clock_now();
88 utime_t latency
= now
;
89 latency
-= op
->get_req()->get_recv_stamp();
92 logger
->inc(l_osd_sop
);
93 logger
->tinc(l_osd_sop_lat
, latency
);
96 if (subop
!= l_osd_sop_pull
) {
97 uint64_t inb
= op
->get_req()->get_data().length();
98 logger
->inc(l_osd_sop_inb
, inb
);
99 if (subop
== l_osd_sop_w
) {
100 logger
->inc(l_osd_sop_w_inb
, inb
);
101 logger
->tinc(l_osd_sop_w_lat
, latency
);
102 } else if (subop
== l_osd_sop_push
) {
103 logger
->inc(l_osd_sop_push_inb
, inb
);
104 logger
->tinc(l_osd_sop_push_lat
, latency
);
106 assert("no support subop" == 0);
108 logger
->tinc(l_osd_sop_pull_lat
, latency
);
112 ReplicatedBackend::ReplicatedBackend(
113 PGBackend::Listener
*pg
,
115 ObjectStore::CollectionHandle
&c
,
118 PGBackend(cct
, pg
, store
, coll
, c
) {}
120 void ReplicatedBackend::run_recovery_op(
121 PGBackend::RecoveryHandle
*_h
,
124 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
125 send_pushes(priority
, h
->pushes
);
126 send_pulls(priority
, h
->pulls
);
127 send_recovery_deletes(priority
, h
->deletes
);
131 int ReplicatedBackend::recover_object(
132 const hobject_t
&hoid
,
134 ObjectContextRef head
,
135 ObjectContextRef obc
,
139 dout(10) << __func__
<< ": " << hoid
<< dendl
;
140 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
141 if (get_parent()->get_local_missing().is_missing(hoid
)) {
151 int started
= start_pushes(
156 pushing
[hoid
].clear();
163 void ReplicatedBackend::check_recovery_sources(const OSDMapRef
& osdmap
)
165 for(map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= pull_from_peer
.begin();
166 i
!= pull_from_peer
.end();
168 if (osdmap
->is_down(i
->first
.osd
)) {
169 dout(10) << "check_recovery_sources resetting pulls from osd." << i
->first
170 << ", osdmap has it marked down" << dendl
;
171 for (set
<hobject_t
>::iterator j
= i
->second
.begin();
172 j
!= i
->second
.end();
174 get_parent()->cancel_pull(*j
);
175 clear_pull(pulling
.find(*j
), false);
177 pull_from_peer
.erase(i
++);
184 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op
)
186 dout(10) << __func__
<< ": " << op
<< dendl
;
187 switch (op
->get_req()->get_type()) {
188 case MSG_OSD_PG_PULL
:
195 bool ReplicatedBackend::_handle_message(
199 dout(10) << __func__
<< ": " << op
<< dendl
;
200 switch (op
->get_req()->get_type()) {
201 case MSG_OSD_PG_PUSH
:
205 case MSG_OSD_PG_PULL
:
209 case MSG_OSD_PG_PUSH_REPLY
:
213 case MSG_OSD_SUBOP
: {
214 const MOSDSubOp
*m
= static_cast<const MOSDSubOp
*>(op
->get_req());
215 if (m
->ops
.size() == 0) {
221 case MSG_OSD_REPOP
: {
226 case MSG_OSD_REPOPREPLY
: {
237 void ReplicatedBackend::clear_recovery_state()
239 // clear pushing/pulling maps
240 for (auto &&i
: pushing
) {
241 for (auto &&j
: i
.second
) {
242 get_parent()->release_locks(j
.second
.lock_manager
);
247 for (auto &&i
: pulling
) {
248 get_parent()->release_locks(i
.second
.lock_manager
);
251 pull_from_peer
.clear();
254 void ReplicatedBackend::on_change()
256 dout(10) << __func__
<< dendl
;
257 for (map
<ceph_tid_t
, InProgressOp
>::iterator i
= in_progress_ops
.begin();
258 i
!= in_progress_ops
.end();
259 in_progress_ops
.erase(i
++)) {
260 if (i
->second
.on_commit
)
261 delete i
->second
.on_commit
;
262 if (i
->second
.on_applied
)
263 delete i
->second
.on_applied
;
265 clear_recovery_state();
268 void ReplicatedBackend::on_flushed()
272 int ReplicatedBackend::objects_read_sync(
273 const hobject_t
&hoid
,
279 return store
->read(ch
, ghobject_t(hoid
), off
, len
, *bl
, op_flags
);
282 struct AsyncReadCallback
: public GenContext
<ThreadPool::TPHandle
&> {
285 AsyncReadCallback(int r
, Context
*c
) : r(r
), c(c
) {}
286 void finish(ThreadPool::TPHandle
&) override
{
290 ~AsyncReadCallback() override
{
294 void ReplicatedBackend::objects_read_async(
295 const hobject_t
&hoid
,
296 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
297 pair
<bufferlist
*, Context
*> > > &to_read
,
298 Context
*on_complete
,
301 // There is no fast read implementation for replication backend yet
305 for (list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
306 pair
<bufferlist
*, Context
*> > >::const_iterator i
=
308 i
!= to_read
.end() && r
>= 0;
310 int _r
= store
->read(ch
, ghobject_t(hoid
), i
->first
.get
<0>(),
311 i
->first
.get
<1>(), *(i
->second
.first
),
313 if (i
->second
.second
) {
314 get_parent()->schedule_recovery_work(
315 get_parent()->bless_gencontext(
316 new AsyncReadCallback(_r
, i
->second
.second
)));
321 get_parent()->schedule_recovery_work(
322 get_parent()->bless_gencontext(
323 new AsyncReadCallback(r
, on_complete
)));
326 class C_OSD_OnOpCommit
: public Context
{
327 ReplicatedBackend
*pg
;
328 ReplicatedBackend::InProgressOp
*op
;
330 C_OSD_OnOpCommit(ReplicatedBackend
*pg
, ReplicatedBackend::InProgressOp
*op
)
332 void finish(int) override
{
337 class C_OSD_OnOpApplied
: public Context
{
338 ReplicatedBackend
*pg
;
339 ReplicatedBackend::InProgressOp
*op
;
341 C_OSD_OnOpApplied(ReplicatedBackend
*pg
, ReplicatedBackend::InProgressOp
*op
)
343 void finish(int) override
{
348 void generate_transaction(
349 PGTransactionUPtr
&pgt
,
351 bool legacy_log_entries
,
352 vector
<pg_log_entry_t
> &log_entries
,
353 ObjectStore::Transaction
*t
,
354 set
<hobject_t
> *added
,
355 set
<hobject_t
> *removed
)
361 for (auto &&le
: log_entries
) {
362 le
.mark_unrollbackable();
363 auto oiter
= pgt
->op_map
.find(le
.soid
);
364 if (oiter
!= pgt
->op_map
.end() && oiter
->second
.updated_snaps
) {
365 bufferlist
bl(oiter
->second
.updated_snaps
->second
.size() * 8 + 8);
366 ::encode(oiter
->second
.updated_snaps
->second
, bl
);
368 le
.snaps
.reassign_to_mempool(mempool::mempool_osd_pglog
);
372 pgt
->safe_create_traverse(
373 [&](pair
<const hobject_t
, PGTransaction::ObjectOperation
> &obj_op
) {
374 const hobject_t
&oid
= obj_op
.first
;
375 const ghobject_t goid
=
376 ghobject_t(oid
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
);
377 const PGTransaction::ObjectOperation
&op
= obj_op
.second
;
380 if (op
.is_fresh_object()) {
382 } else if (op
.is_delete()) {
383 removed
->insert(oid
);
387 if (op
.delete_first
) {
388 t
->remove(coll
, goid
);
393 [&](const PGTransaction::ObjectOperation::Init::None
&) {
395 [&](const PGTransaction::ObjectOperation::Init::Create
&op
) {
396 t
->touch(coll
, goid
);
398 [&](const PGTransaction::ObjectOperation::Init::Clone
&op
) {
402 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
405 [&](const PGTransaction::ObjectOperation::Init::Rename
&op
) {
406 assert(op
.source
.is_temp());
407 t
->collection_move_rename(
410 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
416 t
->truncate(coll
, goid
, op
.truncate
->first
);
417 if (op
.truncate
->first
!= op
.truncate
->second
)
418 t
->truncate(coll
, goid
, op
.truncate
->second
);
421 if (!op
.attr_updates
.empty()) {
422 map
<string
, bufferlist
> attrs
;
423 for (auto &&p
: op
.attr_updates
) {
425 attrs
[p
.first
] = *(p
.second
);
427 t
->rmattr(coll
, goid
, p
.first
);
429 t
->setattrs(coll
, goid
, attrs
);
433 t
->omap_clear(coll
, goid
);
435 t
->omap_setheader(coll
, goid
, *(op
.omap_header
));
437 for (auto &&up
: op
.omap_updates
) {
438 using UpdateType
= PGTransaction::ObjectOperation::OmapUpdateType
;
440 case UpdateType::Remove
:
441 t
->omap_rmkeys(coll
, goid
, up
.second
);
443 case UpdateType::Insert
:
444 t
->omap_setkeys(coll
, goid
, up
.second
);
449 // updated_snaps doesn't matter since we marked unrollbackable
452 auto &hint
= *(op
.alloc_hint
);
456 hint
.expected_object_size
,
457 hint
.expected_write_size
,
461 for (auto &&extent
: op
.buffer_updates
) {
462 using BufferUpdate
= PGTransaction::ObjectOperation::BufferUpdate
;
465 [&](const BufferUpdate::Write
&op
) {
473 [&](const BufferUpdate::Zero
&op
) {
480 [&](const BufferUpdate::CloneRange
&op
) {
481 assert(op
.len
== extent
.get_len());
484 ghobject_t(op
.from
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
494 void ReplicatedBackend::submit_transaction(
495 const hobject_t
&soid
,
496 const object_stat_sum_t
&delta_stats
,
497 const eversion_t
&at_version
,
498 PGTransactionUPtr
&&_t
,
499 const eversion_t
&trim_to
,
500 const eversion_t
&roll_forward_to
,
501 const vector
<pg_log_entry_t
> &_log_entries
,
502 boost::optional
<pg_hit_set_history_t
> &hset_history
,
503 Context
*on_local_applied_sync
,
504 Context
*on_all_acked
,
505 Context
*on_all_commit
,
508 OpRequestRef orig_op
)
514 vector
<pg_log_entry_t
> log_entries(_log_entries
);
515 ObjectStore::Transaction op_t
;
516 PGTransactionUPtr
t(std::move(_t
));
517 set
<hobject_t
> added
, removed
;
518 generate_transaction(
521 (get_osdmap()->require_osd_release
< CEPH_RELEASE_KRAKEN
),
526 assert(added
.size() <= 1);
527 assert(removed
.size() <= 1);
529 assert(!in_progress_ops
.count(tid
));
530 InProgressOp
&op
= in_progress_ops
.insert(
534 tid
, on_all_commit
, on_all_acked
,
539 op
.waiting_for_applied
.insert(
540 parent
->get_actingbackfill_shards().begin(),
541 parent
->get_actingbackfill_shards().end());
542 op
.waiting_for_commit
.insert(
543 parent
->get_actingbackfill_shards().begin(),
544 parent
->get_actingbackfill_shards().end());
553 added
.size() ? *(added
.begin()) : hobject_t(),
554 removed
.size() ? *(removed
.begin()) : hobject_t(),
560 add_temp_objs(added
);
561 clear_temp_objs(removed
);
563 parent
->log_operation(
571 op_t
.register_on_applied_sync(on_local_applied_sync
);
572 op_t
.register_on_applied(
573 parent
->bless_context(
574 new C_OSD_OnOpApplied(this, &op
)));
575 op_t
.register_on_commit(
576 parent
->bless_context(
577 new C_OSD_OnOpCommit(this, &op
)));
579 vector
<ObjectStore::Transaction
> tls
;
580 tls
.push_back(std::move(op_t
));
582 parent
->queue_transactions(tls
, op
.op
);
585 void ReplicatedBackend::op_applied(
589 OID_EVENT_TRACE_WITH_MSG((op
&& op
->op
) ? op
->op
->get_req() : NULL
, "OP_APPLIED_BEGIN", true);
590 dout(10) << __func__
<< ": " << op
->tid
<< dendl
;
592 op
->op
->mark_event("op_applied");
593 op
->op
->pg_trace
.event("op applied");
596 op
->waiting_for_applied
.erase(get_parent()->whoami_shard());
597 parent
->op_applied(op
->v
);
599 if (op
->waiting_for_applied
.empty()) {
600 op
->on_applied
->complete(0);
604 assert(!op
->on_commit
&& !op
->on_applied
);
605 in_progress_ops
.erase(op
->tid
);
609 void ReplicatedBackend::op_commit(
613 OID_EVENT_TRACE_WITH_MSG((op
&& op
->op
) ? op
->op
->get_req() : NULL
, "OP_COMMIT_BEGIN", true);
614 dout(10) << __func__
<< ": " << op
->tid
<< dendl
;
616 op
->op
->mark_event("op_commit");
617 op
->op
->pg_trace
.event("op commit");
620 op
->waiting_for_commit
.erase(get_parent()->whoami_shard());
622 if (op
->waiting_for_commit
.empty()) {
623 op
->on_commit
->complete(0);
627 assert(!op
->on_commit
&& !op
->on_applied
);
628 in_progress_ops
.erase(op
->tid
);
632 void ReplicatedBackend::do_repop_reply(OpRequestRef op
)
634 static_cast<MOSDRepOpReply
*>(op
->get_nonconst_req())->finish_decode();
635 const MOSDRepOpReply
*r
= static_cast<const MOSDRepOpReply
*>(op
->get_req());
636 assert(r
->get_header().type
== MSG_OSD_REPOPREPLY
);
640 // must be replication.
641 ceph_tid_t rep_tid
= r
->get_tid();
642 pg_shard_t from
= r
->from
;
644 if (in_progress_ops
.count(rep_tid
)) {
645 map
<ceph_tid_t
, InProgressOp
>::iterator iter
=
646 in_progress_ops
.find(rep_tid
);
647 InProgressOp
&ip_op
= iter
->second
;
648 const MOSDOp
*m
= NULL
;
650 m
= static_cast<const MOSDOp
*>(ip_op
.op
->get_req());
653 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " op " //<< *m
654 << " ack_type " << (int)r
->ack_type
658 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " (no op) "
659 << " ack_type " << (int)r
->ack_type
665 if (r
->ack_type
& CEPH_OSD_FLAG_ONDISK
) {
666 assert(ip_op
.waiting_for_commit
.count(from
));
667 ip_op
.waiting_for_commit
.erase(from
);
670 ss
<< "sub_op_commit_rec from " << from
;
671 ip_op
.op
->mark_event_string(ss
.str());
672 ip_op
.op
->pg_trace
.event("sub_op_commit_rec");
675 assert(ip_op
.waiting_for_applied
.count(from
));
678 ss
<< "sub_op_applied_rec from " << from
;
679 ip_op
.op
->mark_event_string(ss
.str());
680 ip_op
.op
->pg_trace
.event("sub_op_applied_rec");
683 ip_op
.waiting_for_applied
.erase(from
);
685 parent
->update_peer_last_complete_ondisk(
687 r
->get_last_complete_ondisk());
689 if (ip_op
.waiting_for_applied
.empty() &&
691 ip_op
.on_applied
->complete(0);
692 ip_op
.on_applied
= 0;
694 if (ip_op
.waiting_for_commit
.empty() &&
696 ip_op
.on_commit
->complete(0);
700 assert(!ip_op
.on_commit
&& !ip_op
.on_applied
);
701 in_progress_ops
.erase(iter
);
706 int ReplicatedBackend::be_deep_scrub(
707 const hobject_t
&poid
,
709 ScrubMapBuilder
&pos
,
712 dout(10) << __func__
<< " " << poid
<< " pos " << pos
<< dendl
;
714 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
|
715 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
|
716 CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE
;
719 sleeptime
.set_from_double(cct
->_conf
->osd_debug_deep_scrub_sleep
);
720 if (sleeptime
!= utime_t()) {
721 lgeneric_derr(cct
) << __func__
<< " sleeping for " << sleeptime
<< dendl
;
725 assert(poid
== pos
.ls
[pos
.pos
]);
726 if (!pos
.data_done()) {
727 if (pos
.data_pos
== 0) {
728 pos
.data_hash
= bufferhash(-1);
735 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
737 cct
->_conf
->osd_deep_scrub_stride
, bl
,
740 dout(20) << __func__
<< " " << poid
<< " got "
741 << r
<< " on read, read_error" << dendl
;
749 if (r
== cct
->_conf
->osd_deep_scrub_stride
) {
750 dout(20) << __func__
<< " " << poid
<< " more data, digest so far 0x"
751 << std::hex
<< pos
.data_hash
.digest() << std::dec
<< dendl
;
756 o
.digest
= pos
.data_hash
.digest();
757 o
.digest_present
= true;
758 dout(20) << __func__
<< " " << poid
<< " done with data, digest 0x"
759 << std::hex
<< o
.digest
<< std::dec
<< dendl
;
763 if (pos
.omap_pos
.empty()) {
764 pos
.omap_hash
= bufferhash(-1);
767 r
= store
->omap_get_header(
770 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
773 dout(20) << __func__
<< " " << poid
<< " got "
774 << r
<< " on omap header read, read_error" << dendl
;
778 if (r
== 0 && hdrbl
.length()) {
779 dout(25) << "CRC header " << string(hdrbl
.c_str(), hdrbl
.length())
781 pos
.omap_hash
<< hdrbl
;
786 ObjectMap::ObjectMapIterator iter
= store
->get_omap_iterator(
789 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
791 if (pos
.omap_pos
.length()) {
792 iter
->lower_bound(pos
.omap_pos
);
794 iter
->seek_to_first();
796 int max
= g_conf
->osd_deep_scrub_keys
;
797 while (iter
->status() == 0 && iter
->valid()) {
798 pos
.omap_bytes
+= iter
->value().length();
801 // fixme: we can do this more efficiently.
803 ::encode(iter
->key(), bl
);
804 ::encode(iter
->value(), bl
);
809 if (iter
->valid() && max
== 0) {
810 pos
.omap_pos
= iter
->key();
813 if (iter
->status() < 0) {
814 dout(25) << __func__
<< " " << poid
815 << " on omap scan, db status error" << dendl
;
821 if (pos
.omap_keys
> cct
->_conf
->
822 osd_deep_scrub_large_omap_object_key_threshold
||
823 pos
.omap_bytes
> cct
->_conf
->
824 osd_deep_scrub_large_omap_object_value_sum_threshold
) {
825 dout(25) << __func__
<< " " << poid
826 << " large omap object detected. Object has " << pos
.omap_keys
827 << " keys and size " << pos
.omap_bytes
<< " bytes" << dendl
;
828 o
.large_omap_object_found
= true;
829 o
.large_omap_object_key_count
= pos
.omap_keys
;
830 o
.large_omap_object_value_size
= pos
.omap_bytes
;
831 map
.has_large_omap_object_errors
= true;
834 o
.omap_digest
= pos
.omap_hash
.digest();
835 o
.omap_digest_present
= true;
836 dout(20) << __func__
<< " done with " << poid
<< " omap_digest "
837 << std::hex
<< o
.omap_digest
<< std::dec
<< dendl
;
843 void ReplicatedBackend::_do_push(OpRequestRef op
)
845 const MOSDPGPush
*m
= static_cast<const MOSDPGPush
*>(op
->get_req());
846 assert(m
->get_type() == MSG_OSD_PG_PUSH
);
847 pg_shard_t from
= m
->from
;
851 vector
<PushReplyOp
> replies
;
852 ObjectStore::Transaction t
;
854 if (get_parent()->check_failsafe_full(ss
)) {
855 dout(10) << __func__
<< " Out of space (failsafe) processing push request: " << ss
.str() << dendl
;
858 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
859 i
!= m
->pushes
.end();
861 replies
.push_back(PushReplyOp());
862 handle_push(from
, *i
, &(replies
.back()), &t
);
865 MOSDPGPushReply
*reply
= new MOSDPGPushReply
;
866 reply
->from
= get_parent()->whoami_shard();
867 reply
->set_priority(m
->get_priority());
868 reply
->pgid
= get_info().pgid
;
869 reply
->map_epoch
= m
->map_epoch
;
870 reply
->min_epoch
= m
->min_epoch
;
871 reply
->replies
.swap(replies
);
872 reply
->compute_cost(cct
);
874 t
.register_on_complete(
875 new PG_SendMessageOnConn(
876 get_parent(), reply
, m
->get_connection()));
878 get_parent()->queue_transaction(std::move(t
));
881 struct C_ReplicatedBackend_OnPullComplete
: GenContext
<ThreadPool::TPHandle
&> {
882 ReplicatedBackend
*bc
;
883 list
<ReplicatedBackend::pull_complete_info
> to_continue
;
885 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend
*bc
, int priority
)
886 : bc(bc
), priority(priority
) {}
888 void finish(ThreadPool::TPHandle
&handle
) override
{
889 ReplicatedBackend::RPGHandle
*h
= bc
->_open_recovery_op();
890 for (auto &&i
: to_continue
) {
891 auto j
= bc
->pulling
.find(i
.hoid
);
892 assert(j
!= bc
->pulling
.end());
893 ObjectContextRef obc
= j
->second
.obc
;
894 bc
->clear_pull(j
, false /* already did it */);
895 int started
= bc
->start_pushes(i
.hoid
, obc
, h
);
897 bc
->pushing
[i
.hoid
].clear();
898 bc
->get_parent()->primary_failed(i
.hoid
);
899 bc
->get_parent()->primary_error(i
.hoid
, obc
->obs
.oi
.version
);
900 } else if (!started
) {
901 bc
->get_parent()->on_global_recover(
902 i
.hoid
, i
.stat
, false);
904 handle
.reset_tp_timeout();
906 bc
->run_recovery_op(h
, priority
);
910 void ReplicatedBackend::_do_pull_response(OpRequestRef op
)
912 const MOSDPGPush
*m
= static_cast<const MOSDPGPush
*>(op
->get_req());
913 assert(m
->get_type() == MSG_OSD_PG_PUSH
);
914 pg_shard_t from
= m
->from
;
918 vector
<PullOp
> replies(1);
921 if (get_parent()->check_failsafe_full(ss
)) {
922 dout(10) << __func__
<< " Out of space (failsafe) processing pull response (push): " << ss
.str() << dendl
;
926 ObjectStore::Transaction t
;
927 list
<pull_complete_info
> to_continue
;
928 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
929 i
!= m
->pushes
.end();
931 bool more
= handle_pull_response(from
, *i
, &(replies
.back()), &to_continue
, &t
);
933 replies
.push_back(PullOp());
935 if (!to_continue
.empty()) {
936 C_ReplicatedBackend_OnPullComplete
*c
=
937 new C_ReplicatedBackend_OnPullComplete(
940 c
->to_continue
.swap(to_continue
);
941 t
.register_on_complete(
942 new PG_RecoveryQueueAsync(
944 get_parent()->bless_gencontext(c
)));
946 replies
.erase(replies
.end() - 1);
948 if (replies
.size()) {
949 MOSDPGPull
*reply
= new MOSDPGPull
;
950 reply
->from
= parent
->whoami_shard();
951 reply
->set_priority(m
->get_priority());
952 reply
->pgid
= get_info().pgid
;
953 reply
->map_epoch
= m
->map_epoch
;
954 reply
->min_epoch
= m
->min_epoch
;
955 reply
->set_pulls(&replies
);
956 reply
->compute_cost(cct
);
958 t
.register_on_complete(
959 new PG_SendMessageOnConn(
960 get_parent(), reply
, m
->get_connection()));
963 get_parent()->queue_transaction(std::move(t
));
966 void ReplicatedBackend::do_pull(OpRequestRef op
)
968 MOSDPGPull
*m
= static_cast<MOSDPGPull
*>(op
->get_nonconst_req());
969 assert(m
->get_type() == MSG_OSD_PG_PULL
);
970 pg_shard_t from
= m
->from
;
972 map
<pg_shard_t
, vector
<PushOp
> > replies
;
973 vector
<PullOp
> pulls
;
974 m
->take_pulls(&pulls
);
975 for (auto& i
: pulls
) {
976 replies
[from
].push_back(PushOp());
977 handle_pull(from
, i
, &(replies
[from
].back()));
979 send_pushes(m
->get_priority(), replies
);
982 void ReplicatedBackend::do_push_reply(OpRequestRef op
)
984 const MOSDPGPushReply
*m
= static_cast<const MOSDPGPushReply
*>(op
->get_req());
985 assert(m
->get_type() == MSG_OSD_PG_PUSH_REPLY
);
986 pg_shard_t from
= m
->from
;
988 vector
<PushOp
> replies(1);
989 for (vector
<PushReplyOp
>::const_iterator i
= m
->replies
.begin();
990 i
!= m
->replies
.end();
992 bool more
= handle_push_reply(from
, *i
, &(replies
.back()));
994 replies
.push_back(PushOp());
996 replies
.erase(replies
.end() - 1);
998 map
<pg_shard_t
, vector
<PushOp
> > _replies
;
999 _replies
[from
].swap(replies
);
1000 send_pushes(m
->get_priority(), _replies
);
1003 Message
* ReplicatedBackend::generate_subop(
1004 const hobject_t
&soid
,
1005 const eversion_t
&at_version
,
1008 eversion_t pg_trim_to
,
1009 eversion_t pg_roll_forward_to
,
1010 hobject_t new_temp_oid
,
1011 hobject_t discard_temp_oid
,
1012 const vector
<pg_log_entry_t
> &log_entries
,
1013 boost::optional
<pg_hit_set_history_t
> &hset_hist
,
1014 ObjectStore::Transaction
&op_t
,
1016 const pg_info_t
&pinfo
)
1018 int acks_wanted
= CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
;
1019 // forward the write/update/whatever
1020 MOSDRepOp
*wr
= new MOSDRepOp(
1021 reqid
, parent
->whoami_shard(),
1022 spg_t(get_info().pgid
.pgid
, peer
.shard
),
1024 get_osdmap()->get_epoch(),
1025 parent
->get_last_peering_reset_epoch(),
1028 // ship resulting transaction, log entries, and pg_stats
1029 if (!parent
->should_send_op(peer
, soid
)) {
1030 dout(10) << "issue_repop shipping empty opt to osd." << peer
1031 <<", object " << soid
1032 << " beyond MAX(last_backfill_started "
1033 << ", pinfo.last_backfill "
1034 << pinfo
.last_backfill
<< ")" << dendl
;
1035 ObjectStore::Transaction t
;
1036 ::encode(t
, wr
->get_data());
1038 ::encode(op_t
, wr
->get_data());
1039 wr
->get_header().data_off
= op_t
.get_data_alignment();
1042 ::encode(log_entries
, wr
->logbl
);
1044 if (pinfo
.is_incomplete())
1045 wr
->pg_stats
= pinfo
.stats
; // reflects backfill progress
1047 wr
->pg_stats
= get_info().stats
;
1049 wr
->pg_trim_to
= pg_trim_to
;
1050 wr
->pg_roll_forward_to
= pg_roll_forward_to
;
1052 wr
->new_temp_oid
= new_temp_oid
;
1053 wr
->discard_temp_oid
= discard_temp_oid
;
1054 wr
->updated_hit_set_history
= hset_hist
;
1058 void ReplicatedBackend::issue_op(
1059 const hobject_t
&soid
,
1060 const eversion_t
&at_version
,
1063 eversion_t pg_trim_to
,
1064 eversion_t pg_roll_forward_to
,
1065 hobject_t new_temp_oid
,
1066 hobject_t discard_temp_oid
,
1067 const vector
<pg_log_entry_t
> &log_entries
,
1068 boost::optional
<pg_hit_set_history_t
> &hset_hist
,
1070 ObjectStore::Transaction
&op_t
)
1073 op
->op
->pg_trace
.event("issue replication ops");
1075 if (parent
->get_actingbackfill_shards().size() > 1) {
1077 set
<pg_shard_t
> replicas
= parent
->get_actingbackfill_shards();
1078 replicas
.erase(parent
->whoami_shard());
1079 ss
<< "waiting for subops from " << replicas
;
1081 op
->op
->mark_sub_op_sent(ss
.str());
1083 for (set
<pg_shard_t
>::const_iterator i
=
1084 parent
->get_actingbackfill_shards().begin();
1085 i
!= parent
->get_actingbackfill_shards().end();
1087 if (*i
== parent
->whoami_shard()) continue;
1088 pg_shard_t peer
= *i
;
1089 const pg_info_t
&pinfo
= parent
->get_shard_info().find(peer
)->second
;
1092 wr
= generate_subop(
1107 wr
->trace
.init("replicated op", nullptr, &op
->op
->pg_trace
);
1108 get_parent()->send_message_osd_cluster(
1109 peer
.osd
, wr
, get_osdmap()->get_epoch());
1114 void ReplicatedBackend::do_repop(OpRequestRef op
)
1116 static_cast<MOSDRepOp
*>(op
->get_nonconst_req())->finish_decode();
1117 const MOSDRepOp
*m
= static_cast<const MOSDRepOp
*>(op
->get_req());
1118 int msg_type
= m
->get_type();
1119 assert(MSG_OSD_REPOP
== msg_type
);
1121 const hobject_t
& soid
= m
->poid
;
1123 dout(10) << __func__
<< " " << soid
1124 << " v " << m
->version
1125 << (m
->logbl
.length() ? " (transaction)" : " (parallel exec")
1126 << " " << m
->logbl
.length()
1130 assert(m
->map_epoch
>= get_info().history
.same_interval_since
);
1132 // we better not be missing this.
1133 assert(!parent
->get_log().get_missing().is_missing(soid
));
1135 parent
->maybe_preempt_replica_scrub(soid
);
1137 int ackerosd
= m
->get_source().num();
1141 RepModifyRef
rm(std::make_shared
<RepModify
>());
1143 rm
->ackerosd
= ackerosd
;
1144 rm
->last_complete
= get_info().last_complete
;
1145 rm
->epoch_started
= get_osdmap()->get_epoch();
1147 assert(m
->logbl
.length());
1148 // shipped transaction and log entries
1149 vector
<pg_log_entry_t
> log
;
1151 bufferlist::iterator p
= const_cast<bufferlist
&>(m
->get_data()).begin();
1152 ::decode(rm
->opt
, p
);
1154 if (m
->new_temp_oid
!= hobject_t()) {
1155 dout(20) << __func__
<< " start tracking temp " << m
->new_temp_oid
<< dendl
;
1156 add_temp_obj(m
->new_temp_oid
);
1158 if (m
->discard_temp_oid
!= hobject_t()) {
1159 dout(20) << __func__
<< " stop tracking temp " << m
->discard_temp_oid
<< dendl
;
1160 if (rm
->opt
.empty()) {
1161 dout(10) << __func__
<< ": removing object " << m
->discard_temp_oid
1162 << " since we won't get the transaction" << dendl
;
1163 rm
->localt
.remove(coll
, ghobject_t(m
->discard_temp_oid
));
1165 clear_temp_obj(m
->discard_temp_oid
);
1168 p
= const_cast<bufferlist
&>(m
->logbl
).begin();
1170 rm
->opt
.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
1172 bool update_snaps
= false;
1173 if (!rm
->opt
.empty()) {
1174 // If the opt is non-empty, we infer we are before
1175 // last_backfill (according to the primary, not our
1176 // not-quite-accurate value), and should update the
1177 // collections now. Otherwise, we do it later on push.
1178 update_snaps
= true;
1180 parent
->update_stats(m
->pg_stats
);
1181 parent
->log_operation(
1183 m
->updated_hit_set_history
,
1185 m
->pg_roll_forward_to
,
1189 rm
->opt
.register_on_commit(
1190 parent
->bless_context(
1191 new C_OSD_RepModifyCommit(this, rm
)));
1192 rm
->localt
.register_on_applied(
1193 parent
->bless_context(
1194 new C_OSD_RepModifyApply(this, rm
)));
1195 vector
<ObjectStore::Transaction
> tls
;
1197 tls
.push_back(std::move(rm
->localt
));
1198 tls
.push_back(std::move(rm
->opt
));
1199 parent
->queue_transactions(tls
, op
);
1200 // op is cleaned up by oncommit/onapply when both are executed
1203 void ReplicatedBackend::repop_applied(RepModifyRef rm
)
1205 rm
->op
->mark_event("sub_op_applied");
1207 rm
->op
->pg_trace
.event("sup_op_applied");
1209 dout(10) << __func__
<< " on " << rm
<< " op "
1210 << *rm
->op
->get_req() << dendl
;
1211 const Message
*m
= rm
->op
->get_req();
1212 const MOSDRepOp
*req
= static_cast<const MOSDRepOp
*>(m
);
1213 eversion_t version
= req
->version
;
1215 // send ack to acker only if we haven't sent a commit already
1216 if (!rm
->committed
) {
1217 Message
*ack
= new MOSDRepOpReply(
1218 req
, parent
->whoami_shard(),
1219 0, get_osdmap()->get_epoch(), req
->min_epoch
, CEPH_OSD_FLAG_ACK
);
1220 ack
->set_priority(CEPH_MSG_PRIO_HIGH
); // this better match commit priority!
1221 ack
->trace
= rm
->op
->pg_trace
;
1222 get_parent()->send_message_osd_cluster(
1223 rm
->ackerosd
, ack
, get_osdmap()->get_epoch());
1226 parent
->op_applied(version
);
1229 void ReplicatedBackend::repop_commit(RepModifyRef rm
)
1231 rm
->op
->mark_commit_sent();
1232 rm
->op
->pg_trace
.event("sup_op_commit");
1233 rm
->committed
= true;
1236 const MOSDRepOp
*m
= static_cast<const MOSDRepOp
*>(rm
->op
->get_req());
1237 assert(m
->get_type() == MSG_OSD_REPOP
);
1238 dout(10) << __func__
<< " on op " << *m
1239 << ", sending commit to osd." << rm
->ackerosd
1241 assert(get_osdmap()->is_up(rm
->ackerosd
));
1243 get_parent()->update_last_complete_ondisk(rm
->last_complete
);
1245 MOSDRepOpReply
*reply
= new MOSDRepOpReply(
1247 get_parent()->whoami_shard(),
1248 0, get_osdmap()->get_epoch(), m
->get_min_epoch(), CEPH_OSD_FLAG_ONDISK
);
1249 reply
->set_last_complete_ondisk(rm
->last_complete
);
1250 reply
->set_priority(CEPH_MSG_PRIO_HIGH
); // this better match ack priority!
1251 reply
->trace
= rm
->op
->pg_trace
;
1252 get_parent()->send_message_osd_cluster(
1253 rm
->ackerosd
, reply
, get_osdmap()->get_epoch());
1255 log_subop_stats(get_parent()->get_logger(), rm
->op
, l_osd_sop_w
);
1259 // ===========================================================
1261 void ReplicatedBackend::calc_head_subsets(
1262 ObjectContextRef obc
, SnapSet
& snapset
, const hobject_t
& head
,
1263 const pg_missing_t
& missing
,
1264 const hobject_t
&last_backfill
,
1265 interval_set
<uint64_t>& data_subset
,
1266 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1267 ObcLockManager
&manager
)
1269 dout(10) << "calc_head_subsets " << head
1270 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1272 uint64_t size
= obc
->obs
.oi
.size
;
1274 data_subset
.insert(0, size
);
1276 if (get_parent()->get_pool().allow_incomplete_clones()) {
1277 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1281 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1282 dout(10) << "calc_head_subsets " << head
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1287 interval_set
<uint64_t> cloning
;
1288 interval_set
<uint64_t> prev
;
1290 prev
.insert(0, size
);
1292 for (int j
=snapset
.clones
.size()-1; j
>=0; j
--) {
1294 c
.snap
= snapset
.clones
[j
];
1295 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1296 if (!missing
.is_missing(c
) &&
1297 c
< last_backfill
&&
1298 get_parent()->try_lock_for_read(c
, manager
)) {
1299 dout(10) << "calc_head_subsets " << head
<< " has prev " << c
1300 << " overlap " << prev
<< dendl
;
1301 clone_subsets
[c
] = prev
;
1302 cloning
.union_of(prev
);
1305 dout(10) << "calc_head_subsets " << head
<< " does not have prev " << c
1306 << " overlap " << prev
<< dendl
;
1310 if (cloning
.num_intervals() > cct
->_conf
->osd_recover_clone_overlap_limit
) {
1311 dout(10) << "skipping clone, too many holes" << dendl
;
1312 get_parent()->release_locks(manager
);
1313 clone_subsets
.clear();
1317 // what's left for us to push?
1318 data_subset
.subtract(cloning
);
1320 dout(10) << "calc_head_subsets " << head
1321 << " data_subset " << data_subset
1322 << " clone_subsets " << clone_subsets
<< dendl
;
1325 void ReplicatedBackend::calc_clone_subsets(
1326 SnapSet
& snapset
, const hobject_t
& soid
,
1327 const pg_missing_t
& missing
,
1328 const hobject_t
&last_backfill
,
1329 interval_set
<uint64_t>& data_subset
,
1330 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1331 ObcLockManager
&manager
)
1333 dout(10) << "calc_clone_subsets " << soid
1334 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1336 uint64_t size
= snapset
.clone_size
[soid
.snap
];
1338 data_subset
.insert(0, size
);
1340 if (get_parent()->get_pool().allow_incomplete_clones()) {
1341 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1345 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1346 dout(10) << "calc_clone_subsets " << soid
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1351 for (i
=0; i
< snapset
.clones
.size(); i
++)
1352 if (snapset
.clones
[i
] == soid
.snap
)
1355 // any overlap with next older clone?
1356 interval_set
<uint64_t> cloning
;
1357 interval_set
<uint64_t> prev
;
1359 prev
.insert(0, size
);
1360 for (int j
=i
-1; j
>=0; j
--) {
1362 c
.snap
= snapset
.clones
[j
];
1363 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1364 if (!missing
.is_missing(c
) &&
1365 c
< last_backfill
&&
1366 get_parent()->try_lock_for_read(c
, manager
)) {
1367 dout(10) << "calc_clone_subsets " << soid
<< " has prev " << c
1368 << " overlap " << prev
<< dendl
;
1369 clone_subsets
[c
] = prev
;
1370 cloning
.union_of(prev
);
1373 dout(10) << "calc_clone_subsets " << soid
<< " does not have prev " << c
1374 << " overlap " << prev
<< dendl
;
1377 // overlap with next newest?
1378 interval_set
<uint64_t> next
;
1380 next
.insert(0, size
);
1381 for (unsigned j
=i
+1; j
<snapset
.clones
.size(); j
++) {
1383 c
.snap
= snapset
.clones
[j
];
1384 next
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
-1]]);
1385 if (!missing
.is_missing(c
) &&
1386 c
< last_backfill
&&
1387 get_parent()->try_lock_for_read(c
, manager
)) {
1388 dout(10) << "calc_clone_subsets " << soid
<< " has next " << c
1389 << " overlap " << next
<< dendl
;
1390 clone_subsets
[c
] = next
;
1391 cloning
.union_of(next
);
1394 dout(10) << "calc_clone_subsets " << soid
<< " does not have next " << c
1395 << " overlap " << next
<< dendl
;
1398 if (cloning
.num_intervals() > cct
->_conf
->osd_recover_clone_overlap_limit
) {
1399 dout(10) << "skipping clone, too many holes" << dendl
;
1400 get_parent()->release_locks(manager
);
1401 clone_subsets
.clear();
1406 // what's left for us to push?
1407 data_subset
.subtract(cloning
);
1409 dout(10) << "calc_clone_subsets " << soid
1410 << " data_subset " << data_subset
1411 << " clone_subsets " << clone_subsets
<< dendl
;
1414 void ReplicatedBackend::prepare_pull(
1416 const hobject_t
& soid
,
1417 ObjectContextRef headctx
,
1420 assert(get_parent()->get_local_missing().get_items().count(soid
));
1421 eversion_t _v
= get_parent()->get_local_missing().get_items().find(
1424 const map
<hobject_t
, set
<pg_shard_t
>> &missing_loc(
1425 get_parent()->get_missing_loc_shards());
1426 const map
<pg_shard_t
, pg_missing_t
> &peer_missing(
1427 get_parent()->get_shard_missing());
1428 map
<hobject_t
, set
<pg_shard_t
>>::const_iterator q
= missing_loc
.find(soid
);
1429 assert(q
!= missing_loc
.end());
1430 assert(!q
->second
.empty());
1433 vector
<pg_shard_t
> shuffle(q
->second
.begin(), q
->second
.end());
1434 random_shuffle(shuffle
.begin(), shuffle
.end());
1435 vector
<pg_shard_t
>::iterator p
= shuffle
.begin();
1436 assert(get_osdmap()->is_up(p
->osd
));
1437 pg_shard_t fromshard
= *p
;
1439 dout(7) << "pull " << soid
1441 << " on osds " << q
->second
1442 << " from osd." << fromshard
1445 assert(peer_missing
.count(fromshard
));
1446 const pg_missing_t
&pmissing
= peer_missing
.find(fromshard
)->second
;
1447 if (pmissing
.is_missing(soid
, v
)) {
1448 assert(pmissing
.get_items().find(soid
)->second
.have
!= v
);
1449 dout(10) << "pulling soid " << soid
<< " from osd " << fromshard
1450 << " at version " << pmissing
.get_items().find(soid
)->second
.have
1451 << " rather than at version " << v
<< dendl
;
1452 v
= pmissing
.get_items().find(soid
)->second
.have
;
1453 assert(get_parent()->get_log().get_log().objects
.count(soid
) &&
1454 (get_parent()->get_log().get_log().objects
.find(soid
)->second
->op
==
1455 pg_log_entry_t::LOST_REVERT
) &&
1456 (get_parent()->get_log().get_log().objects
.find(
1457 soid
)->second
->reverting_to
==
1461 ObjectRecoveryInfo recovery_info
;
1462 ObcLockManager lock_manager
;
1464 if (soid
.is_snap()) {
1465 assert(!get_parent()->get_local_missing().is_missing(
1467 !get_parent()->get_local_missing().is_missing(
1468 soid
.get_snapdir()));
1471 SnapSetContext
*ssc
= headctx
->ssc
;
1473 dout(10) << " snapset " << ssc
->snapset
<< dendl
;
1474 recovery_info
.ss
= ssc
->snapset
;
1476 ssc
->snapset
, soid
, get_parent()->get_local_missing(),
1477 get_info().last_backfill
,
1478 recovery_info
.copy_subset
,
1479 recovery_info
.clone_subset
,
1481 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1482 dout(10) << " pulling " << recovery_info
<< dendl
;
1484 assert(ssc
->snapset
.clone_size
.count(soid
.snap
));
1485 recovery_info
.size
= ssc
->snapset
.clone_size
[soid
.snap
];
1487 // pulling head or unversioned object.
1488 // always pull the whole thing.
1489 recovery_info
.copy_subset
.insert(0, (uint64_t)-1);
1490 recovery_info
.size
= ((uint64_t)-1);
1493 h
->pulls
[fromshard
].push_back(PullOp());
1494 PullOp
&op
= h
->pulls
[fromshard
].back();
1497 op
.recovery_info
= recovery_info
;
1498 op
.recovery_info
.soid
= soid
;
1499 op
.recovery_info
.version
= v
;
1500 op
.recovery_progress
.data_complete
= false;
1501 op
.recovery_progress
.omap_complete
= false;
1502 op
.recovery_progress
.data_recovered_to
= 0;
1503 op
.recovery_progress
.first
= true;
1505 assert(!pulling
.count(soid
));
1506 pull_from_peer
[fromshard
].insert(soid
);
1507 PullInfo
&pi
= pulling
[soid
];
1508 pi
.from
= fromshard
;
1510 pi
.head_ctx
= headctx
;
1511 pi
.recovery_info
= op
.recovery_info
;
1512 pi
.recovery_progress
= op
.recovery_progress
;
1513 pi
.cache_dont_need
= h
->cache_dont_need
;
1514 pi
.lock_manager
= std::move(lock_manager
);
1518 * intelligently push an object to a replica. make use of existing
1519 * clones/heads and dup data ranges where possible.
1521 int ReplicatedBackend::prep_push_to_replica(
1522 ObjectContextRef obc
, const hobject_t
& soid
, pg_shard_t peer
,
1523 PushOp
*pop
, bool cache_dont_need
)
1525 const object_info_t
& oi
= obc
->obs
.oi
;
1526 uint64_t size
= obc
->obs
.oi
.size
;
1528 dout(10) << __func__
<< ": " << soid
<< " v" << oi
.version
1529 << " size " << size
<< " to osd." << peer
<< dendl
;
1531 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1532 interval_set
<uint64_t> data_subset
;
1534 ObcLockManager lock_manager
;
1535 // are we doing a clone on the replica?
1536 if (soid
.snap
&& soid
.snap
< CEPH_NOSNAP
) {
1537 hobject_t head
= soid
;
1538 head
.snap
= CEPH_NOSNAP
;
1540 // try to base push off of clones that succeed/preceed poid
1541 // we need the head (and current SnapSet) locally to do that.
1542 if (get_parent()->get_local_missing().is_missing(head
)) {
1543 dout(15) << "push_to_replica missing head " << head
<< ", pushing raw clone" << dendl
;
1544 return prep_push(obc
, soid
, peer
, pop
, cache_dont_need
);
1546 hobject_t snapdir
= head
;
1547 snapdir
.snap
= CEPH_SNAPDIR
;
1548 if (get_parent()->get_local_missing().is_missing(snapdir
)) {
1549 dout(15) << "push_to_replica missing snapdir " << snapdir
1550 << ", pushing raw clone" << dendl
;
1551 return prep_push(obc
, soid
, peer
, pop
, cache_dont_need
);
1554 SnapSetContext
*ssc
= obc
->ssc
;
1556 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1557 pop
->recovery_info
.ss
= ssc
->snapset
;
1558 map
<pg_shard_t
, pg_missing_t
>::const_iterator pm
=
1559 get_parent()->get_shard_missing().find(peer
);
1560 assert(pm
!= get_parent()->get_shard_missing().end());
1561 map
<pg_shard_t
, pg_info_t
>::const_iterator pi
=
1562 get_parent()->get_shard_info().find(peer
);
1563 assert(pi
!= get_parent()->get_shard_info().end());
1567 pi
->second
.last_backfill
,
1568 data_subset
, clone_subsets
,
1570 } else if (soid
.snap
== CEPH_NOSNAP
) {
1571 // pushing head or unversioned object.
1572 // base this on partially on replica's clones?
1573 SnapSetContext
*ssc
= obc
->ssc
;
1575 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1578 ssc
->snapset
, soid
, get_parent()->get_shard_missing().find(peer
)->second
,
1579 get_parent()->get_shard_info().find(peer
)->second
.last_backfill
,
1580 data_subset
, clone_subsets
,
1593 std::move(lock_manager
));
1596 int ReplicatedBackend::prep_push(ObjectContextRef obc
,
1597 const hobject_t
& soid
, pg_shard_t peer
,
1598 PushOp
*pop
, bool cache_dont_need
)
1600 interval_set
<uint64_t> data_subset
;
1601 if (obc
->obs
.oi
.size
)
1602 data_subset
.insert(0, obc
->obs
.oi
.size
);
1603 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1605 return prep_push(obc
, soid
, peer
,
1606 obc
->obs
.oi
.version
, data_subset
, clone_subsets
,
1607 pop
, cache_dont_need
, ObcLockManager());
1610 int ReplicatedBackend::prep_push(
1611 ObjectContextRef obc
,
1612 const hobject_t
& soid
, pg_shard_t peer
,
1614 interval_set
<uint64_t> &data_subset
,
1615 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1617 bool cache_dont_need
,
1618 ObcLockManager
&&lock_manager
)
1620 get_parent()->begin_peer_recover(peer
, soid
);
1622 PushInfo
&pi
= pushing
[soid
][peer
];
1624 pi
.recovery_info
.size
= obc
->obs
.oi
.size
;
1625 pi
.recovery_info
.copy_subset
= data_subset
;
1626 pi
.recovery_info
.clone_subset
= clone_subsets
;
1627 pi
.recovery_info
.soid
= soid
;
1628 pi
.recovery_info
.oi
= obc
->obs
.oi
;
1629 pi
.recovery_info
.ss
= pop
->recovery_info
.ss
;
1630 pi
.recovery_info
.version
= version
;
1631 pi
.lock_manager
= std::move(lock_manager
);
1633 ObjectRecoveryProgress new_progress
;
1634 int r
= build_push_op(pi
.recovery_info
,
1635 pi
.recovery_progress
,
1638 &(pi
.stat
), cache_dont_need
);
1641 pi
.recovery_progress
= new_progress
;
1645 void ReplicatedBackend::submit_push_data(
1646 const ObjectRecoveryInfo
&recovery_info
,
1649 bool cache_dont_need
,
1650 const interval_set
<uint64_t> &intervals_included
,
1651 bufferlist data_included
,
1652 bufferlist omap_header
,
1653 const map
<string
, bufferlist
> &attrs
,
1654 const map
<string
, bufferlist
> &omap_entries
,
1655 ObjectStore::Transaction
*t
)
1657 hobject_t target_oid
;
1658 if (first
&& complete
) {
1659 target_oid
= recovery_info
.soid
;
1661 target_oid
= get_parent()->get_temp_recovery_object(recovery_info
.soid
,
1662 recovery_info
.version
);
1664 dout(10) << __func__
<< ": Adding oid "
1665 << target_oid
<< " in the temp collection" << dendl
;
1666 add_temp_obj(target_oid
);
1671 t
->remove(coll
, ghobject_t(target_oid
));
1672 t
->touch(coll
, ghobject_t(target_oid
));
1673 t
->truncate(coll
, ghobject_t(target_oid
), recovery_info
.size
);
1674 if (omap_header
.length())
1675 t
->omap_setheader(coll
, ghobject_t(target_oid
), omap_header
);
1677 bufferlist bv
= attrs
.at(OI_ATTR
);
1678 object_info_t
oi(bv
);
1679 t
->set_alloc_hint(coll
, ghobject_t(target_oid
),
1680 oi
.expected_object_size
,
1681 oi
.expected_write_size
,
1682 oi
.alloc_hint_flags
);
1685 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
;
1686 if (cache_dont_need
)
1687 fadvise_flags
|= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
1688 for (interval_set
<uint64_t>::const_iterator p
= intervals_included
.begin();
1689 p
!= intervals_included
.end();
1692 bit
.substr_of(data_included
, off
, p
.get_len());
1693 t
->write(coll
, ghobject_t(target_oid
),
1694 p
.get_start(), p
.get_len(), bit
, fadvise_flags
);
1698 if (!omap_entries
.empty())
1699 t
->omap_setkeys(coll
, ghobject_t(target_oid
), omap_entries
);
1701 t
->setattrs(coll
, ghobject_t(target_oid
), attrs
);
1705 dout(10) << __func__
<< ": Removing oid "
1706 << target_oid
<< " from the temp collection" << dendl
;
1707 clear_temp_obj(target_oid
);
1708 t
->remove(coll
, ghobject_t(recovery_info
.soid
));
1709 t
->collection_move_rename(coll
, ghobject_t(target_oid
),
1710 coll
, ghobject_t(recovery_info
.soid
));
1713 submit_push_complete(recovery_info
, t
);
1717 void ReplicatedBackend::submit_push_complete(
1718 const ObjectRecoveryInfo
&recovery_info
,
1719 ObjectStore::Transaction
*t
)
1721 for (map
<hobject_t
, interval_set
<uint64_t>>::const_iterator p
=
1722 recovery_info
.clone_subset
.begin();
1723 p
!= recovery_info
.clone_subset
.end();
1725 for (interval_set
<uint64_t>::const_iterator q
= p
->second
.begin();
1726 q
!= p
->second
.end();
1728 dout(15) << " clone_range " << p
->first
<< " "
1729 << q
.get_start() << "~" << q
.get_len() << dendl
;
1730 t
->clone_range(coll
, ghobject_t(p
->first
), ghobject_t(recovery_info
.soid
),
1731 q
.get_start(), q
.get_len(), q
.get_start());
1736 ObjectRecoveryInfo
ReplicatedBackend::recalc_subsets(
1737 const ObjectRecoveryInfo
& recovery_info
,
1738 SnapSetContext
*ssc
,
1739 ObcLockManager
&manager
)
1741 if (!recovery_info
.soid
.snap
|| recovery_info
.soid
.snap
>= CEPH_NOSNAP
)
1742 return recovery_info
;
1743 ObjectRecoveryInfo new_info
= recovery_info
;
1744 new_info
.copy_subset
.clear();
1745 new_info
.clone_subset
.clear();
1747 get_parent()->release_locks(manager
); // might already have locks
1749 ssc
->snapset
, new_info
.soid
, get_parent()->get_local_missing(),
1750 get_info().last_backfill
,
1751 new_info
.copy_subset
, new_info
.clone_subset
,
1756 bool ReplicatedBackend::handle_pull_response(
1757 pg_shard_t from
, const PushOp
&pop
, PullOp
*response
,
1758 list
<pull_complete_info
> *to_continue
,
1759 ObjectStore::Transaction
*t
)
1761 interval_set
<uint64_t> data_included
= pop
.data_included
;
1764 dout(10) << "handle_pull_response "
1765 << pop
.recovery_info
1766 << pop
.after_progress
1767 << " data.size() is " << data
.length()
1768 << " data_included: " << data_included
1770 if (pop
.version
== eversion_t()) {
1771 // replica doesn't have it!
1772 _failed_pull(from
, pop
.soid
);
1776 const hobject_t
&hoid
= pop
.soid
;
1777 assert((data_included
.empty() && data
.length() == 0) ||
1778 (!data_included
.empty() && data
.length() > 0));
1780 auto piter
= pulling
.find(hoid
);
1781 if (piter
== pulling
.end()) {
1785 PullInfo
&pi
= piter
->second
;
1786 if (pi
.recovery_info
.size
== (uint64_t(-1))) {
1787 pi
.recovery_info
.size
= pop
.recovery_info
.size
;
1788 pi
.recovery_info
.copy_subset
.intersection_of(
1789 pop
.recovery_info
.copy_subset
);
1791 // If primary doesn't have object info and didn't know version
1792 if (pi
.recovery_info
.version
== eversion_t()) {
1793 pi
.recovery_info
.version
= pop
.version
;
1796 bool first
= pi
.recovery_progress
.first
;
1798 // attrs only reference the origin bufferlist (decode from
1799 // MOSDPGPush message) whose size is much greater than attrs in
1800 // recovery. If obc cache it (get_obc maybe cache the attr), this
1801 // causes the whole origin bufferlist would not be free until obc
1802 // is evicted from obc cache. So rebuild the bufferlists before
1804 auto attrset
= pop
.attrset
;
1805 for (auto& a
: attrset
) {
1808 pi
.obc
= get_parent()->get_obc(pi
.recovery_info
.soid
, attrset
);
1809 pi
.recovery_info
.oi
= pi
.obc
->obs
.oi
;
1810 pi
.recovery_info
= recalc_subsets(
1817 interval_set
<uint64_t> usable_intervals
;
1818 bufferlist usable_data
;
1819 trim_pushed_data(pi
.recovery_info
.copy_subset
,
1824 data_included
= usable_intervals
;
1825 data
.claim(usable_data
);
1828 pi
.recovery_progress
= pop
.after_progress
;
1830 dout(10) << "new recovery_info " << pi
.recovery_info
1831 << ", new progress " << pi
.recovery_progress
1834 bool complete
= pi
.is_complete();
1836 submit_push_data(pi
.recovery_info
, first
,
1837 complete
, pi
.cache_dont_need
,
1838 data_included
, data
,
1844 pi
.stat
.num_keys_recovered
+= pop
.omap_entries
.size();
1845 pi
.stat
.num_bytes_recovered
+= data
.length();
1848 pi
.stat
.num_objects_recovered
++;
1849 clear_pull_from(piter
);
1850 to_continue
->push_back({hoid
, pi
.stat
});
1851 get_parent()->on_local_recover(
1852 hoid
, pi
.recovery_info
, pi
.obc
, false, t
);
1855 response
->soid
= pop
.soid
;
1856 response
->recovery_info
= pi
.recovery_info
;
1857 response
->recovery_progress
= pi
.recovery_progress
;
1862 void ReplicatedBackend::handle_push(
1863 pg_shard_t from
, const PushOp
&pop
, PushReplyOp
*response
,
1864 ObjectStore::Transaction
*t
)
1866 dout(10) << "handle_push "
1867 << pop
.recovery_info
1868 << pop
.after_progress
1872 bool first
= pop
.before_progress
.first
;
1873 bool complete
= pop
.after_progress
.data_complete
&&
1874 pop
.after_progress
.omap_complete
;
1876 response
->soid
= pop
.recovery_info
.soid
;
1877 submit_push_data(pop
.recovery_info
,
1880 true, // must be replicate
1889 get_parent()->on_local_recover(
1890 pop
.recovery_info
.soid
,
1892 ObjectContextRef(), // ok, is replica
1897 void ReplicatedBackend::send_pushes(int prio
, map
<pg_shard_t
, vector
<PushOp
> > &pushes
)
1899 for (map
<pg_shard_t
, vector
<PushOp
> >::iterator i
= pushes
.begin();
1902 ConnectionRef con
= get_parent()->get_con_osd_cluster(
1904 get_osdmap()->get_epoch());
1907 vector
<PushOp
>::iterator j
= i
->second
.begin();
1908 while (j
!= i
->second
.end()) {
1910 uint64_t pushes
= 0;
1911 MOSDPGPush
*msg
= new MOSDPGPush();
1912 msg
->from
= get_parent()->whoami_shard();
1913 msg
->pgid
= get_parent()->primary_spg_t();
1914 msg
->map_epoch
= get_osdmap()->get_epoch();
1915 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
1916 msg
->set_priority(prio
);
1918 (j
!= i
->second
.end() &&
1919 cost
< cct
->_conf
->osd_max_push_cost
&&
1920 pushes
< cct
->_conf
->osd_max_push_objects
) ;
1922 dout(20) << __func__
<< ": sending push " << *j
1923 << " to osd." << i
->first
<< dendl
;
1924 cost
+= j
->cost(cct
);
1926 msg
->pushes
.push_back(*j
);
1928 msg
->set_cost(cost
);
1929 get_parent()->send_message_osd_cluster(msg
, con
);
1934 void ReplicatedBackend::send_pulls(int prio
, map
<pg_shard_t
, vector
<PullOp
> > &pulls
)
1936 for (map
<pg_shard_t
, vector
<PullOp
> >::iterator i
= pulls
.begin();
1939 ConnectionRef con
= get_parent()->get_con_osd_cluster(
1941 get_osdmap()->get_epoch());
1944 dout(20) << __func__
<< ": sending pulls " << i
->second
1945 << " to osd." << i
->first
<< dendl
;
1946 MOSDPGPull
*msg
= new MOSDPGPull();
1947 msg
->from
= parent
->whoami_shard();
1948 msg
->set_priority(prio
);
1949 msg
->pgid
= get_parent()->primary_spg_t();
1950 msg
->map_epoch
= get_osdmap()->get_epoch();
1951 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
1952 msg
->set_pulls(&i
->second
);
1953 msg
->compute_cost(cct
);
1954 get_parent()->send_message_osd_cluster(msg
, con
);
1958 int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo
&recovery_info
,
1959 const ObjectRecoveryProgress
&progress
,
1960 ObjectRecoveryProgress
*out_progress
,
1962 object_stat_sum_t
*stat
,
1963 bool cache_dont_need
)
1965 ObjectRecoveryProgress _new_progress
;
1967 out_progress
= &_new_progress
;
1968 ObjectRecoveryProgress
&new_progress
= *out_progress
;
1969 new_progress
= progress
;
1971 dout(7) << __func__
<< " " << recovery_info
.soid
1972 << " v " << recovery_info
.version
1973 << " size " << recovery_info
.size
1974 << " recovery_info: " << recovery_info
1977 eversion_t v
= recovery_info
.version
;
1978 if (progress
.first
) {
1979 int r
= store
->omap_get_header(coll
, ghobject_t(recovery_info
.soid
), &out_op
->omap_header
);
1981 dout(1) << __func__
<< " get omap header failed: " << cpp_strerror(-r
) << dendl
;
1984 r
= store
->getattrs(ch
, ghobject_t(recovery_info
.soid
), out_op
->attrset
);
1986 dout(1) << __func__
<< " getattrs failed: " << cpp_strerror(-r
) << dendl
;
1991 bufferlist bv
= out_op
->attrset
[OI_ATTR
];
1994 bufferlist::iterator bliter
= bv
.begin();
1995 ::decode(oi
, bliter
);
1997 dout(0) << __func__
<< ": bad object_info_t: " << recovery_info
.soid
<< dendl
;
2001 // If requestor didn't know the version, use ours
2002 if (v
== eversion_t()) {
2004 } else if (oi
.version
!= v
) {
2005 get_parent()->clog_error() << get_info().pgid
<< " push "
2006 << recovery_info
.soid
<< " v "
2007 << recovery_info
.version
2008 << " failed because local copy is "
2013 new_progress
.first
= false;
2015 // Once we provide the version subsequent requests will have it, so
2016 // at this point it must be known.
2017 assert(v
!= eversion_t());
2019 uint64_t available
= cct
->_conf
->osd_recovery_max_chunk
;
2020 if (!progress
.omap_complete
) {
2021 ObjectMap::ObjectMapIterator iter
=
2022 store
->get_omap_iterator(coll
,
2023 ghobject_t(recovery_info
.soid
));
2025 for (iter
->lower_bound(progress
.omap_recovered_to
);
2027 iter
->next(false)) {
2028 if (!out_op
->omap_entries
.empty() &&
2029 ((cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
> 0 &&
2030 out_op
->omap_entries
.size() >= cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
) ||
2031 available
<= iter
->key().size() + iter
->value().length()))
2033 out_op
->omap_entries
.insert(make_pair(iter
->key(), iter
->value()));
2035 if ((iter
->key().size() + iter
->value().length()) <= available
)
2036 available
-= (iter
->key().size() + iter
->value().length());
2041 new_progress
.omap_complete
= true;
2043 new_progress
.omap_recovered_to
= iter
->key();
2046 if (available
> 0) {
2047 if (!recovery_info
.copy_subset
.empty()) {
2048 interval_set
<uint64_t> copy_subset
= recovery_info
.copy_subset
;
2049 map
<uint64_t, uint64_t> m
;
2050 int r
= store
->fiemap(ch
, ghobject_t(recovery_info
.soid
), 0,
2051 copy_subset
.range_end(), m
);
2053 interval_set
<uint64_t> fiemap_included(m
);
2054 copy_subset
.intersection_of(fiemap_included
);
2056 // intersection of copy_subset and empty interval_set would be empty anyway
2057 copy_subset
.clear();
2060 out_op
->data_included
.span_of(copy_subset
, progress
.data_recovered_to
,
2062 if (out_op
->data_included
.empty()) // zero filled section, skip to end!
2063 new_progress
.data_recovered_to
= recovery_info
.copy_subset
.range_end();
2065 new_progress
.data_recovered_to
= out_op
->data_included
.range_end();
2068 out_op
->data_included
.clear();
2071 for (interval_set
<uint64_t>::iterator p
= out_op
->data_included
.begin();
2072 p
!= out_op
->data_included
.end();
2075 int r
= store
->read(ch
, ghobject_t(recovery_info
.soid
),
2076 p
.get_start(), p
.get_len(), bit
,
2077 cache_dont_need
? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
: 0);
2078 if (cct
->_conf
->osd_debug_random_push_read_error
&&
2079 (rand() % (int)(cct
->_conf
->osd_debug_random_push_read_error
* 100.0)) == 0) {
2080 dout(0) << __func__
<< ": inject EIO " << recovery_info
.soid
<< dendl
;
2086 if (p
.get_len() != bit
.length()) {
2087 dout(10) << " extent " << p
.get_start() << "~" << p
.get_len()
2088 << " is actually " << p
.get_start() << "~" << bit
.length()
2090 interval_set
<uint64_t>::iterator save
= p
++;
2091 if (bit
.length() == 0)
2092 out_op
->data_included
.erase(save
); //Remove this empty interval
2094 save
.set_len(bit
.length());
2095 // Remove any other intervals present
2096 while (p
!= out_op
->data_included
.end()) {
2097 interval_set
<uint64_t>::iterator save
= p
++;
2098 out_op
->data_included
.erase(save
);
2100 new_progress
.data_complete
= true;
2101 out_op
->data
.claim_append(bit
);
2104 out_op
->data
.claim_append(bit
);
2107 if (new_progress
.is_complete(recovery_info
)) {
2108 new_progress
.data_complete
= true;
2110 stat
->num_objects_recovered
++;
2114 stat
->num_keys_recovered
+= out_op
->omap_entries
.size();
2115 stat
->num_bytes_recovered
+= out_op
->data
.length();
2118 get_parent()->get_logger()->inc(l_osd_push
);
2119 get_parent()->get_logger()->inc(l_osd_push_outb
, out_op
->data
.length());
2122 out_op
->version
= v
;
2123 out_op
->soid
= recovery_info
.soid
;
2124 out_op
->recovery_info
= recovery_info
;
2125 out_op
->after_progress
= new_progress
;
2126 out_op
->before_progress
= progress
;
2130 void ReplicatedBackend::prep_push_op_blank(const hobject_t
& soid
, PushOp
*op
)
2132 op
->recovery_info
.version
= eversion_t();
2133 op
->version
= eversion_t();
2137 bool ReplicatedBackend::handle_push_reply(
2138 pg_shard_t peer
, const PushReplyOp
&op
, PushOp
*reply
)
2140 const hobject_t
&soid
= op
.soid
;
2141 if (pushing
.count(soid
) == 0) {
2142 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2143 << ", or anybody else"
2146 } else if (pushing
[soid
].count(peer
) == 0) {
2147 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2151 PushInfo
*pi
= &pushing
[soid
][peer
];
2152 bool error
= pushing
[soid
].begin()->second
.recovery_progress
.error
;
2154 if (!pi
->recovery_progress
.data_complete
&& !error
) {
2155 dout(10) << " pushing more from, "
2156 << pi
->recovery_progress
.data_recovered_to
2157 << " of " << pi
->recovery_info
.copy_subset
<< dendl
;
2158 ObjectRecoveryProgress new_progress
;
2159 int r
= build_push_op(
2161 pi
->recovery_progress
, &new_progress
, reply
,
2163 // Handle the case of a read error right after we wrote, which is
2164 // hopefuilly extremely rare.
2166 dout(5) << __func__
<< ": oid " << soid
<< " error " << r
<< dendl
;
2171 pi
->recovery_progress
= new_progress
;
2177 get_parent()->on_peer_recover( peer
, soid
, pi
->recovery_info
);
2179 get_parent()->release_locks(pi
->lock_manager
);
2180 object_stat_sum_t stat
= pi
->stat
;
2181 eversion_t v
= pi
->recovery_info
.version
;
2182 pushing
[soid
].erase(peer
);
2185 if (pushing
[soid
].empty()) {
2187 get_parent()->on_global_recover(soid
, stat
, false);
2189 get_parent()->on_primary_error(soid
, v
);
2190 pushing
.erase(soid
);
2192 // This looks weird, but we erased the current peer and need to remember
2193 // the error on any other one, while getting more acks.
2195 pushing
[soid
].begin()->second
.recovery_progress
.error
= true;
2196 dout(10) << "pushed " << soid
<< ", still waiting for push ack from "
2197 << pushing
[soid
].size() << " others" << dendl
;
2204 void ReplicatedBackend::handle_pull(pg_shard_t peer
, PullOp
&op
, PushOp
*reply
)
2206 const hobject_t
&soid
= op
.soid
;
2208 int r
= store
->stat(ch
, ghobject_t(soid
), &st
);
2210 get_parent()->clog_error() << get_info().pgid
<< " "
2211 << peer
<< " tried to pull " << soid
2212 << " but got " << cpp_strerror(-r
);
2213 prep_push_op_blank(soid
, reply
);
2215 ObjectRecoveryInfo
&recovery_info
= op
.recovery_info
;
2216 ObjectRecoveryProgress
&progress
= op
.recovery_progress
;
2217 if (progress
.first
&& recovery_info
.size
== ((uint64_t)-1)) {
2218 // Adjust size and copy_subset
2219 recovery_info
.size
= st
.st_size
;
2220 recovery_info
.copy_subset
.clear();
2222 recovery_info
.copy_subset
.insert(0, st
.st_size
);
2223 assert(recovery_info
.clone_subset
.empty());
2226 r
= build_push_op(recovery_info
, progress
, 0, reply
);
2228 prep_push_op_blank(soid
, reply
);
2233 * trim received data to remove what we don't want
2235 * @param copy_subset intervals we want
2236 * @param data_included intervals we got
2237 * @param data_recieved data we got
2238 * @param intervals_usable intervals we want to keep
2239 * @param data_usable matching data we want to keep
2241 void ReplicatedBackend::trim_pushed_data(
2242 const interval_set
<uint64_t> ©_subset
,
2243 const interval_set
<uint64_t> &intervals_received
,
2244 bufferlist data_received
,
2245 interval_set
<uint64_t> *intervals_usable
,
2246 bufferlist
*data_usable
)
2248 if (intervals_received
.subset_of(copy_subset
)) {
2249 *intervals_usable
= intervals_received
;
2250 *data_usable
= data_received
;
2254 intervals_usable
->intersection_of(copy_subset
,
2255 intervals_received
);
2258 for (interval_set
<uint64_t>::const_iterator p
= intervals_received
.begin();
2259 p
!= intervals_received
.end();
2261 interval_set
<uint64_t> x
;
2262 x
.insert(p
.get_start(), p
.get_len());
2263 x
.intersection_of(copy_subset
);
2264 for (interval_set
<uint64_t>::const_iterator q
= x
.begin();
2268 uint64_t data_off
= off
+ (q
.get_start() - p
.get_start());
2269 sub
.substr_of(data_received
, data_off
, q
.get_len());
2270 data_usable
->claim_append(sub
);
2276 void ReplicatedBackend::_failed_pull(pg_shard_t from
, const hobject_t
&soid
)
2278 dout(20) << __func__
<< ": " << soid
<< " from " << from
<< dendl
;
2279 list
<pg_shard_t
> fl
= { from
};
2280 get_parent()->failed_push(fl
, soid
);
2282 clear_pull(pulling
.find(soid
));
2285 void ReplicatedBackend::clear_pull_from(
2286 map
<hobject_t
, PullInfo
>::iterator piter
)
2288 auto from
= piter
->second
.from
;
2289 pull_from_peer
[from
].erase(piter
->second
.soid
);
2290 if (pull_from_peer
[from
].empty())
2291 pull_from_peer
.erase(from
);
2294 void ReplicatedBackend::clear_pull(
2295 map
<hobject_t
, PullInfo
>::iterator piter
,
2296 bool clear_pull_from_peer
)
2298 if (clear_pull_from_peer
) {
2299 clear_pull_from(piter
);
2301 get_parent()->release_locks(piter
->second
.lock_manager
);
2302 pulling
.erase(piter
);
2305 int ReplicatedBackend::start_pushes(
2306 const hobject_t
&soid
,
2307 ObjectContextRef obc
,
2310 list
< map
<pg_shard_t
, pg_missing_t
>::const_iterator
> shards
;
2312 dout(20) << __func__
<< " soid " << soid
<< dendl
;
2314 assert(get_parent()->get_actingbackfill_shards().size() > 0);
2315 for (set
<pg_shard_t
>::iterator i
=
2316 get_parent()->get_actingbackfill_shards().begin();
2317 i
!= get_parent()->get_actingbackfill_shards().end();
2319 if (*i
== get_parent()->whoami_shard()) continue;
2320 pg_shard_t peer
= *i
;
2321 map
<pg_shard_t
, pg_missing_t
>::const_iterator j
=
2322 get_parent()->get_shard_missing().find(peer
);
2323 assert(j
!= get_parent()->get_shard_missing().end());
2324 if (j
->second
.is_missing(soid
)) {
2325 shards
.push_back(j
);
2329 // If more than 1 read will occur ignore possible request to not cache
2330 bool cache
= shards
.size() == 1 ? h
->cache_dont_need
: false;
2332 for (auto j
: shards
) {
2333 pg_shard_t peer
= j
->first
;
2334 h
->pushes
[peer
].push_back(PushOp());
2335 int r
= prep_push_to_replica(obc
, soid
, peer
,
2336 &(h
->pushes
[peer
].back()), cache
);
2338 // Back out all failed reads
2339 for (auto k
: shards
) {
2340 pg_shard_t p
= k
->first
;
2341 dout(10) << __func__
<< " clean up peer " << p
<< dendl
;
2342 h
->pushes
[p
].pop_back();
2343 if (p
== peer
) break;
2348 return shards
.size();