1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "common/errno.h"
15 #include "ReplicatedBackend.h"
16 #include "messages/MOSDOp.h"
17 #include "messages/MOSDSubOp.h"
18 #include "messages/MOSDRepOp.h"
19 #include "messages/MOSDSubOpReply.h"
20 #include "messages/MOSDRepOpReply.h"
21 #include "messages/MOSDPGPush.h"
22 #include "messages/MOSDPGPull.h"
23 #include "messages/MOSDPGPushReply.h"
24 #include "common/EventTrace.h"
26 #define dout_context cct
27 #define dout_subsys ceph_subsys_osd
28 #define DOUT_PREFIX_ARGS this
30 #define dout_prefix _prefix(_dout, this)
31 static ostream
& _prefix(std::ostream
*_dout
, ReplicatedBackend
*pgb
) {
32 return *_dout
<< pgb
->get_parent()->gen_dbg_prefix();
36 class PG_SendMessageOnConn
: public Context
{
37 PGBackend::Listener
*pg
;
42 PGBackend::Listener
*pg
,
44 ConnectionRef conn
) : pg(pg
), reply(reply
), conn(conn
) {}
45 void finish(int) override
{
46 pg
->send_message_osd_cluster(reply
, conn
.get());
50 class PG_RecoveryQueueAsync
: public Context
{
51 PGBackend::Listener
*pg
;
52 unique_ptr
<GenContext
<ThreadPool::TPHandle
&>> c
;
54 PG_RecoveryQueueAsync(
55 PGBackend::Listener
*pg
,
56 GenContext
<ThreadPool::TPHandle
&> *c
) : pg(pg
), c(c
) {}
57 void finish(int) override
{
58 pg
->schedule_recovery_work(c
.release());
63 struct ReplicatedBackend::C_OSD_RepModifyApply
: public Context
{
64 ReplicatedBackend
*pg
;
66 C_OSD_RepModifyApply(ReplicatedBackend
*pg
, RepModifyRef r
)
68 void finish(int r
) override
{
69 pg
->repop_applied(rm
);
73 struct ReplicatedBackend::C_OSD_RepModifyCommit
: public Context
{
74 ReplicatedBackend
*pg
;
76 C_OSD_RepModifyCommit(ReplicatedBackend
*pg
, RepModifyRef r
)
78 void finish(int r
) override
{
83 static void log_subop_stats(
85 OpRequestRef op
, int subop
)
87 utime_t now
= ceph_clock_now();
88 utime_t latency
= now
;
89 latency
-= op
->get_req()->get_recv_stamp();
92 logger
->inc(l_osd_sop
);
93 logger
->tinc(l_osd_sop_lat
, latency
);
96 if (subop
!= l_osd_sop_pull
) {
97 uint64_t inb
= op
->get_req()->get_data().length();
98 logger
->inc(l_osd_sop_inb
, inb
);
99 if (subop
== l_osd_sop_w
) {
100 logger
->inc(l_osd_sop_w_inb
, inb
);
101 logger
->tinc(l_osd_sop_w_lat
, latency
);
102 } else if (subop
== l_osd_sop_push
) {
103 logger
->inc(l_osd_sop_push_inb
, inb
);
104 logger
->tinc(l_osd_sop_push_lat
, latency
);
106 assert("no support subop" == 0);
108 logger
->tinc(l_osd_sop_pull_lat
, latency
);
112 ReplicatedBackend::ReplicatedBackend(
113 PGBackend::Listener
*pg
,
115 ObjectStore::CollectionHandle
&c
,
118 PGBackend(cct
, pg
, store
, coll
, c
) {}
120 void ReplicatedBackend::run_recovery_op(
121 PGBackend::RecoveryHandle
*_h
,
124 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
125 send_pushes(priority
, h
->pushes
);
126 send_pulls(priority
, h
->pulls
);
127 send_recovery_deletes(priority
, h
->deletes
);
131 int ReplicatedBackend::recover_object(
132 const hobject_t
&hoid
,
134 ObjectContextRef head
,
135 ObjectContextRef obc
,
139 dout(10) << __func__
<< ": " << hoid
<< dendl
;
140 RPGHandle
*h
= static_cast<RPGHandle
*>(_h
);
141 if (get_parent()->get_local_missing().is_missing(hoid
)) {
151 int started
= start_pushes(
156 pushing
[hoid
].clear();
163 void ReplicatedBackend::check_recovery_sources(const OSDMapRef
& osdmap
)
165 for(map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= pull_from_peer
.begin();
166 i
!= pull_from_peer
.end();
168 if (osdmap
->is_down(i
->first
.osd
)) {
169 dout(10) << "check_recovery_sources resetting pulls from osd." << i
->first
170 << ", osdmap has it marked down" << dendl
;
171 for (set
<hobject_t
>::iterator j
= i
->second
.begin();
172 j
!= i
->second
.end();
174 get_parent()->cancel_pull(*j
);
175 clear_pull(pulling
.find(*j
), false);
177 pull_from_peer
.erase(i
++);
184 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op
)
186 dout(10) << __func__
<< ": " << op
<< dendl
;
187 switch (op
->get_req()->get_type()) {
188 case MSG_OSD_PG_PULL
:
195 bool ReplicatedBackend::_handle_message(
199 dout(10) << __func__
<< ": " << op
<< dendl
;
200 switch (op
->get_req()->get_type()) {
201 case MSG_OSD_PG_PUSH
:
205 case MSG_OSD_PG_PULL
:
209 case MSG_OSD_PG_PUSH_REPLY
:
213 case MSG_OSD_SUBOP
: {
214 const MOSDSubOp
*m
= static_cast<const MOSDSubOp
*>(op
->get_req());
215 if (m
->ops
.size() == 0) {
221 case MSG_OSD_REPOP
: {
226 case MSG_OSD_REPOPREPLY
: {
237 void ReplicatedBackend::clear_recovery_state()
239 // clear pushing/pulling maps
240 for (auto &&i
: pushing
) {
241 for (auto &&j
: i
.second
) {
242 get_parent()->release_locks(j
.second
.lock_manager
);
247 for (auto &&i
: pulling
) {
248 get_parent()->release_locks(i
.second
.lock_manager
);
251 pull_from_peer
.clear();
254 void ReplicatedBackend::on_change()
256 dout(10) << __func__
<< dendl
;
257 for (map
<ceph_tid_t
, InProgressOp
>::iterator i
= in_progress_ops
.begin();
258 i
!= in_progress_ops
.end();
259 in_progress_ops
.erase(i
++)) {
260 if (i
->second
.on_commit
)
261 delete i
->second
.on_commit
;
262 if (i
->second
.on_applied
)
263 delete i
->second
.on_applied
;
265 clear_recovery_state();
268 void ReplicatedBackend::on_flushed()
272 int ReplicatedBackend::objects_read_sync(
273 const hobject_t
&hoid
,
279 return store
->read(ch
, ghobject_t(hoid
), off
, len
, *bl
, op_flags
);
282 struct AsyncReadCallback
: public GenContext
<ThreadPool::TPHandle
&> {
285 AsyncReadCallback(int r
, Context
*c
) : r(r
), c(c
) {}
286 void finish(ThreadPool::TPHandle
&) override
{
290 ~AsyncReadCallback() override
{
294 void ReplicatedBackend::objects_read_async(
295 const hobject_t
&hoid
,
296 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
297 pair
<bufferlist
*, Context
*> > > &to_read
,
298 Context
*on_complete
,
301 // There is no fast read implementation for replication backend yet
305 for (list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
306 pair
<bufferlist
*, Context
*> > >::const_iterator i
=
308 i
!= to_read
.end() && r
>= 0;
310 int _r
= store
->read(ch
, ghobject_t(hoid
), i
->first
.get
<0>(),
311 i
->first
.get
<1>(), *(i
->second
.first
),
313 if (i
->second
.second
) {
314 get_parent()->schedule_recovery_work(
315 get_parent()->bless_gencontext(
316 new AsyncReadCallback(_r
, i
->second
.second
)));
321 get_parent()->schedule_recovery_work(
322 get_parent()->bless_gencontext(
323 new AsyncReadCallback(r
, on_complete
)));
326 class C_OSD_OnOpCommit
: public Context
{
327 ReplicatedBackend
*pg
;
328 ReplicatedBackend::InProgressOp
*op
;
330 C_OSD_OnOpCommit(ReplicatedBackend
*pg
, ReplicatedBackend::InProgressOp
*op
)
332 void finish(int) override
{
337 class C_OSD_OnOpApplied
: public Context
{
338 ReplicatedBackend
*pg
;
339 ReplicatedBackend::InProgressOp
*op
;
341 C_OSD_OnOpApplied(ReplicatedBackend
*pg
, ReplicatedBackend::InProgressOp
*op
)
343 void finish(int) override
{
348 void generate_transaction(
349 PGTransactionUPtr
&pgt
,
351 bool legacy_log_entries
,
352 vector
<pg_log_entry_t
> &log_entries
,
353 ObjectStore::Transaction
*t
,
354 set
<hobject_t
> *added
,
355 set
<hobject_t
> *removed
)
361 for (auto &&le
: log_entries
) {
362 le
.mark_unrollbackable();
363 auto oiter
= pgt
->op_map
.find(le
.soid
);
364 if (oiter
!= pgt
->op_map
.end() && oiter
->second
.updated_snaps
) {
365 bufferlist
bl(oiter
->second
.updated_snaps
->second
.size() * 8 + 8);
366 ::encode(oiter
->second
.updated_snaps
->second
, bl
);
368 le
.snaps
.reassign_to_mempool(mempool::mempool_osd_pglog
);
372 pgt
->safe_create_traverse(
373 [&](pair
<const hobject_t
, PGTransaction::ObjectOperation
> &obj_op
) {
374 const hobject_t
&oid
= obj_op
.first
;
375 const ghobject_t goid
=
376 ghobject_t(oid
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
);
377 const PGTransaction::ObjectOperation
&op
= obj_op
.second
;
380 if (op
.is_fresh_object()) {
382 } else if (op
.is_delete()) {
383 removed
->insert(oid
);
387 if (op
.delete_first
) {
388 t
->remove(coll
, goid
);
393 [&](const PGTransaction::ObjectOperation::Init::None
&) {
395 [&](const PGTransaction::ObjectOperation::Init::Create
&op
) {
396 t
->touch(coll
, goid
);
398 [&](const PGTransaction::ObjectOperation::Init::Clone
&op
) {
402 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
405 [&](const PGTransaction::ObjectOperation::Init::Rename
&op
) {
406 assert(op
.source
.is_temp());
407 t
->collection_move_rename(
410 op
.source
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
416 t
->truncate(coll
, goid
, op
.truncate
->first
);
417 if (op
.truncate
->first
!= op
.truncate
->second
)
418 t
->truncate(coll
, goid
, op
.truncate
->second
);
421 if (!op
.attr_updates
.empty()) {
422 map
<string
, bufferlist
> attrs
;
423 for (auto &&p
: op
.attr_updates
) {
425 attrs
[p
.first
] = *(p
.second
);
427 t
->rmattr(coll
, goid
, p
.first
);
429 t
->setattrs(coll
, goid
, attrs
);
433 t
->omap_clear(coll
, goid
);
435 t
->omap_setheader(coll
, goid
, *(op
.omap_header
));
437 for (auto &&up
: op
.omap_updates
) {
438 using UpdateType
= PGTransaction::ObjectOperation::OmapUpdateType
;
440 case UpdateType::Remove
:
441 t
->omap_rmkeys(coll
, goid
, up
.second
);
443 case UpdateType::Insert
:
444 t
->omap_setkeys(coll
, goid
, up
.second
);
449 // updated_snaps doesn't matter since we marked unrollbackable
452 auto &hint
= *(op
.alloc_hint
);
456 hint
.expected_object_size
,
457 hint
.expected_write_size
,
461 for (auto &&extent
: op
.buffer_updates
) {
462 using BufferUpdate
= PGTransaction::ObjectOperation::BufferUpdate
;
465 [&](const BufferUpdate::Write
&op
) {
473 [&](const BufferUpdate::Zero
&op
) {
480 [&](const BufferUpdate::CloneRange
&op
) {
481 assert(op
.len
== extent
.get_len());
484 ghobject_t(op
.from
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
),
494 void ReplicatedBackend::submit_transaction(
495 const hobject_t
&soid
,
496 const object_stat_sum_t
&delta_stats
,
497 const eversion_t
&at_version
,
498 PGTransactionUPtr
&&_t
,
499 const eversion_t
&trim_to
,
500 const eversion_t
&roll_forward_to
,
501 const vector
<pg_log_entry_t
> &_log_entries
,
502 boost::optional
<pg_hit_set_history_t
> &hset_history
,
503 Context
*on_local_applied_sync
,
504 Context
*on_all_acked
,
505 Context
*on_all_commit
,
508 OpRequestRef orig_op
)
514 vector
<pg_log_entry_t
> log_entries(_log_entries
);
515 ObjectStore::Transaction op_t
;
516 PGTransactionUPtr
t(std::move(_t
));
517 set
<hobject_t
> added
, removed
;
518 generate_transaction(
521 (get_osdmap()->require_osd_release
< CEPH_RELEASE_KRAKEN
),
526 assert(added
.size() <= 1);
527 assert(removed
.size() <= 1);
529 assert(!in_progress_ops
.count(tid
));
530 InProgressOp
&op
= in_progress_ops
.insert(
534 tid
, on_all_commit
, on_all_acked
,
539 op
.waiting_for_applied
.insert(
540 parent
->get_actingbackfill_shards().begin(),
541 parent
->get_actingbackfill_shards().end());
542 op
.waiting_for_commit
.insert(
543 parent
->get_actingbackfill_shards().begin(),
544 parent
->get_actingbackfill_shards().end());
553 added
.size() ? *(added
.begin()) : hobject_t(),
554 removed
.size() ? *(removed
.begin()) : hobject_t(),
560 add_temp_objs(added
);
561 clear_temp_objs(removed
);
563 parent
->log_operation(
571 op_t
.register_on_applied_sync(on_local_applied_sync
);
572 op_t
.register_on_applied(
573 parent
->bless_context(
574 new C_OSD_OnOpApplied(this, &op
)));
575 op_t
.register_on_commit(
576 parent
->bless_context(
577 new C_OSD_OnOpCommit(this, &op
)));
579 vector
<ObjectStore::Transaction
> tls
;
580 tls
.push_back(std::move(op_t
));
582 parent
->queue_transactions(tls
, op
.op
);
585 void ReplicatedBackend::op_applied(
589 OID_EVENT_TRACE_WITH_MSG((op
&& op
->op
) ? op
->op
->get_req() : NULL
, "OP_APPLIED_BEGIN", true);
590 dout(10) << __func__
<< ": " << op
->tid
<< dendl
;
592 op
->op
->mark_event("op_applied");
593 op
->op
->pg_trace
.event("op applied");
596 op
->waiting_for_applied
.erase(get_parent()->whoami_shard());
597 parent
->op_applied(op
->v
);
599 if (op
->waiting_for_applied
.empty()) {
600 op
->on_applied
->complete(0);
604 assert(!op
->on_commit
&& !op
->on_applied
);
605 in_progress_ops
.erase(op
->tid
);
609 void ReplicatedBackend::op_commit(
613 OID_EVENT_TRACE_WITH_MSG((op
&& op
->op
) ? op
->op
->get_req() : NULL
, "OP_COMMIT_BEGIN", true);
614 dout(10) << __func__
<< ": " << op
->tid
<< dendl
;
616 op
->op
->mark_event("op_commit");
617 op
->op
->pg_trace
.event("op commit");
620 op
->waiting_for_commit
.erase(get_parent()->whoami_shard());
622 if (op
->waiting_for_commit
.empty()) {
623 op
->on_commit
->complete(0);
627 assert(!op
->on_commit
&& !op
->on_applied
);
628 in_progress_ops
.erase(op
->tid
);
632 void ReplicatedBackend::do_repop_reply(OpRequestRef op
)
634 static_cast<MOSDRepOpReply
*>(op
->get_nonconst_req())->finish_decode();
635 const MOSDRepOpReply
*r
= static_cast<const MOSDRepOpReply
*>(op
->get_req());
636 assert(r
->get_header().type
== MSG_OSD_REPOPREPLY
);
640 // must be replication.
641 ceph_tid_t rep_tid
= r
->get_tid();
642 pg_shard_t from
= r
->from
;
644 if (in_progress_ops
.count(rep_tid
)) {
645 map
<ceph_tid_t
, InProgressOp
>::iterator iter
=
646 in_progress_ops
.find(rep_tid
);
647 InProgressOp
&ip_op
= iter
->second
;
648 const MOSDOp
*m
= NULL
;
650 m
= static_cast<const MOSDOp
*>(ip_op
.op
->get_req());
653 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " op " //<< *m
654 << " ack_type " << (int)r
->ack_type
658 dout(7) << __func__
<< ": tid " << ip_op
.tid
<< " (no op) "
659 << " ack_type " << (int)r
->ack_type
665 if (r
->ack_type
& CEPH_OSD_FLAG_ONDISK
) {
666 assert(ip_op
.waiting_for_commit
.count(from
));
667 ip_op
.waiting_for_commit
.erase(from
);
670 ss
<< "sub_op_commit_rec from " << from
;
671 ip_op
.op
->mark_event_string(ss
.str());
672 ip_op
.op
->pg_trace
.event("sub_op_commit_rec");
675 assert(ip_op
.waiting_for_applied
.count(from
));
678 ss
<< "sub_op_applied_rec from " << from
;
679 ip_op
.op
->mark_event_string(ss
.str());
680 ip_op
.op
->pg_trace
.event("sub_op_applied_rec");
683 ip_op
.waiting_for_applied
.erase(from
);
685 parent
->update_peer_last_complete_ondisk(
687 r
->get_last_complete_ondisk());
689 if (ip_op
.waiting_for_applied
.empty() &&
691 ip_op
.on_applied
->complete(0);
692 ip_op
.on_applied
= 0;
694 if (ip_op
.waiting_for_commit
.empty() &&
696 ip_op
.on_commit
->complete(0);
700 assert(!ip_op
.on_commit
&& !ip_op
.on_applied
);
701 in_progress_ops
.erase(iter
);
706 int ReplicatedBackend::be_deep_scrub(
707 const hobject_t
&poid
,
709 ScrubMapBuilder
&pos
,
712 dout(10) << __func__
<< " " << poid
<< " pos " << pos
<< dendl
;
714 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
|
715 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
718 sleeptime
.set_from_double(cct
->_conf
->osd_debug_deep_scrub_sleep
);
719 if (sleeptime
!= utime_t()) {
720 lgeneric_derr(cct
) << __func__
<< " sleeping for " << sleeptime
<< dendl
;
724 assert(poid
== pos
.ls
[pos
.pos
]);
725 if (!pos
.data_done()) {
726 if (pos
.data_pos
== 0) {
727 pos
.data_hash
= bufferhash(-1);
734 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
736 cct
->_conf
->osd_deep_scrub_stride
, bl
,
739 dout(20) << __func__
<< " " << poid
<< " got "
740 << r
<< " on read, read_error" << dendl
;
748 if (r
== cct
->_conf
->osd_deep_scrub_stride
) {
749 dout(20) << __func__
<< " " << poid
<< " more data, digest so far 0x"
750 << std::hex
<< pos
.data_hash
.digest() << std::dec
<< dendl
;
755 o
.digest
= pos
.data_hash
.digest();
756 o
.digest_present
= true;
757 dout(20) << __func__
<< " " << poid
<< " done with data, digest 0x"
758 << std::hex
<< o
.digest
<< std::dec
<< dendl
;
762 if (pos
.omap_pos
.empty()) {
763 pos
.omap_hash
= bufferhash(-1);
766 r
= store
->omap_get_header(
769 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
772 dout(20) << __func__
<< " " << poid
<< " got "
773 << r
<< " on omap header read, read_error" << dendl
;
777 if (r
== 0 && hdrbl
.length()) {
778 dout(25) << "CRC header " << string(hdrbl
.c_str(), hdrbl
.length())
780 pos
.omap_hash
<< hdrbl
;
785 ObjectMap::ObjectMapIterator iter
= store
->get_omap_iterator(
788 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
790 if (pos
.omap_pos
.length()) {
791 iter
->lower_bound(pos
.omap_pos
);
793 iter
->seek_to_first();
795 int max
= g_conf
->osd_deep_scrub_keys
;
796 while (iter
->status() == 0 && iter
->valid()) {
797 pos
.omap_bytes
+= iter
->value().length();
800 // fixme: we can do this more efficiently.
802 ::encode(iter
->key(), bl
);
803 ::encode(iter
->value(), bl
);
808 if (iter
->valid() && max
== 0) {
809 pos
.omap_pos
= iter
->key();
812 if (iter
->status() < 0) {
813 dout(25) << __func__
<< " " << poid
814 << " on omap scan, db status error" << dendl
;
820 if (pos
.omap_keys
> cct
->_conf
->
821 osd_deep_scrub_large_omap_object_key_threshold
||
822 pos
.omap_bytes
> cct
->_conf
->
823 osd_deep_scrub_large_omap_object_value_sum_threshold
) {
824 dout(25) << __func__
<< " " << poid
825 << " large omap object detected. Object has " << pos
.omap_keys
826 << " keys and size " << pos
.omap_bytes
<< " bytes" << dendl
;
827 o
.large_omap_object_found
= true;
828 o
.large_omap_object_key_count
= pos
.omap_keys
;
829 o
.large_omap_object_value_size
= pos
.omap_bytes
;
830 map
.has_large_omap_object_errors
= true;
833 o
.omap_digest
= pos
.omap_hash
.digest();
834 o
.omap_digest_present
= true;
835 dout(20) << __func__
<< " done with " << poid
<< " omap_digest "
836 << std::hex
<< o
.omap_digest
<< std::dec
<< dendl
;
842 void ReplicatedBackend::_do_push(OpRequestRef op
)
844 const MOSDPGPush
*m
= static_cast<const MOSDPGPush
*>(op
->get_req());
845 assert(m
->get_type() == MSG_OSD_PG_PUSH
);
846 pg_shard_t from
= m
->from
;
850 vector
<PushReplyOp
> replies
;
851 ObjectStore::Transaction t
;
853 if (get_parent()->check_failsafe_full(ss
)) {
854 dout(10) << __func__
<< " Out of space (failsafe) processing push request: " << ss
.str() << dendl
;
857 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
858 i
!= m
->pushes
.end();
860 replies
.push_back(PushReplyOp());
861 handle_push(from
, *i
, &(replies
.back()), &t
);
864 MOSDPGPushReply
*reply
= new MOSDPGPushReply
;
865 reply
->from
= get_parent()->whoami_shard();
866 reply
->set_priority(m
->get_priority());
867 reply
->pgid
= get_info().pgid
;
868 reply
->map_epoch
= m
->map_epoch
;
869 reply
->min_epoch
= m
->min_epoch
;
870 reply
->replies
.swap(replies
);
871 reply
->compute_cost(cct
);
873 t
.register_on_complete(
874 new PG_SendMessageOnConn(
875 get_parent(), reply
, m
->get_connection()));
877 get_parent()->queue_transaction(std::move(t
));
880 struct C_ReplicatedBackend_OnPullComplete
: GenContext
<ThreadPool::TPHandle
&> {
881 ReplicatedBackend
*bc
;
882 list
<ReplicatedBackend::pull_complete_info
> to_continue
;
884 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend
*bc
, int priority
)
885 : bc(bc
), priority(priority
) {}
887 void finish(ThreadPool::TPHandle
&handle
) override
{
888 ReplicatedBackend::RPGHandle
*h
= bc
->_open_recovery_op();
889 for (auto &&i
: to_continue
) {
890 auto j
= bc
->pulling
.find(i
.hoid
);
891 assert(j
!= bc
->pulling
.end());
892 ObjectContextRef obc
= j
->second
.obc
;
893 bc
->clear_pull(j
, false /* already did it */);
894 int started
= bc
->start_pushes(i
.hoid
, obc
, h
);
896 bc
->pushing
[i
.hoid
].clear();
897 bc
->get_parent()->primary_failed(i
.hoid
);
898 bc
->get_parent()->primary_error(i
.hoid
, obc
->obs
.oi
.version
);
899 } else if (!started
) {
900 bc
->get_parent()->on_global_recover(
901 i
.hoid
, i
.stat
, false);
903 handle
.reset_tp_timeout();
905 bc
->run_recovery_op(h
, priority
);
909 void ReplicatedBackend::_do_pull_response(OpRequestRef op
)
911 const MOSDPGPush
*m
= static_cast<const MOSDPGPush
*>(op
->get_req());
912 assert(m
->get_type() == MSG_OSD_PG_PUSH
);
913 pg_shard_t from
= m
->from
;
917 vector
<PullOp
> replies(1);
920 if (get_parent()->check_failsafe_full(ss
)) {
921 dout(10) << __func__
<< " Out of space (failsafe) processing pull response (push): " << ss
.str() << dendl
;
925 ObjectStore::Transaction t
;
926 list
<pull_complete_info
> to_continue
;
927 for (vector
<PushOp
>::const_iterator i
= m
->pushes
.begin();
928 i
!= m
->pushes
.end();
930 bool more
= handle_pull_response(from
, *i
, &(replies
.back()), &to_continue
, &t
);
932 replies
.push_back(PullOp());
934 if (!to_continue
.empty()) {
935 C_ReplicatedBackend_OnPullComplete
*c
=
936 new C_ReplicatedBackend_OnPullComplete(
939 c
->to_continue
.swap(to_continue
);
940 t
.register_on_complete(
941 new PG_RecoveryQueueAsync(
943 get_parent()->bless_gencontext(c
)));
945 replies
.erase(replies
.end() - 1);
947 if (replies
.size()) {
948 MOSDPGPull
*reply
= new MOSDPGPull
;
949 reply
->from
= parent
->whoami_shard();
950 reply
->set_priority(m
->get_priority());
951 reply
->pgid
= get_info().pgid
;
952 reply
->map_epoch
= m
->map_epoch
;
953 reply
->min_epoch
= m
->min_epoch
;
954 reply
->set_pulls(&replies
);
955 reply
->compute_cost(cct
);
957 t
.register_on_complete(
958 new PG_SendMessageOnConn(
959 get_parent(), reply
, m
->get_connection()));
962 get_parent()->queue_transaction(std::move(t
));
965 void ReplicatedBackend::do_pull(OpRequestRef op
)
967 MOSDPGPull
*m
= static_cast<MOSDPGPull
*>(op
->get_nonconst_req());
968 assert(m
->get_type() == MSG_OSD_PG_PULL
);
969 pg_shard_t from
= m
->from
;
971 map
<pg_shard_t
, vector
<PushOp
> > replies
;
972 vector
<PullOp
> pulls
;
973 m
->take_pulls(&pulls
);
974 for (auto& i
: pulls
) {
975 replies
[from
].push_back(PushOp());
976 handle_pull(from
, i
, &(replies
[from
].back()));
978 send_pushes(m
->get_priority(), replies
);
981 void ReplicatedBackend::do_push_reply(OpRequestRef op
)
983 const MOSDPGPushReply
*m
= static_cast<const MOSDPGPushReply
*>(op
->get_req());
984 assert(m
->get_type() == MSG_OSD_PG_PUSH_REPLY
);
985 pg_shard_t from
= m
->from
;
987 vector
<PushOp
> replies(1);
988 for (vector
<PushReplyOp
>::const_iterator i
= m
->replies
.begin();
989 i
!= m
->replies
.end();
991 bool more
= handle_push_reply(from
, *i
, &(replies
.back()));
993 replies
.push_back(PushOp());
995 replies
.erase(replies
.end() - 1);
997 map
<pg_shard_t
, vector
<PushOp
> > _replies
;
998 _replies
[from
].swap(replies
);
999 send_pushes(m
->get_priority(), _replies
);
1002 Message
* ReplicatedBackend::generate_subop(
1003 const hobject_t
&soid
,
1004 const eversion_t
&at_version
,
1007 eversion_t pg_trim_to
,
1008 eversion_t pg_roll_forward_to
,
1009 hobject_t new_temp_oid
,
1010 hobject_t discard_temp_oid
,
1011 const vector
<pg_log_entry_t
> &log_entries
,
1012 boost::optional
<pg_hit_set_history_t
> &hset_hist
,
1013 ObjectStore::Transaction
&op_t
,
1015 const pg_info_t
&pinfo
)
1017 int acks_wanted
= CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
;
1018 // forward the write/update/whatever
1019 MOSDRepOp
*wr
= new MOSDRepOp(
1020 reqid
, parent
->whoami_shard(),
1021 spg_t(get_info().pgid
.pgid
, peer
.shard
),
1023 get_osdmap()->get_epoch(),
1024 parent
->get_last_peering_reset_epoch(),
1027 // ship resulting transaction, log entries, and pg_stats
1028 if (!parent
->should_send_op(peer
, soid
)) {
1029 dout(10) << "issue_repop shipping empty opt to osd." << peer
1030 <<", object " << soid
1031 << " beyond MAX(last_backfill_started "
1032 << ", pinfo.last_backfill "
1033 << pinfo
.last_backfill
<< ")" << dendl
;
1034 ObjectStore::Transaction t
;
1035 ::encode(t
, wr
->get_data());
1037 ::encode(op_t
, wr
->get_data());
1038 wr
->get_header().data_off
= op_t
.get_data_alignment();
1041 ::encode(log_entries
, wr
->logbl
);
1043 if (pinfo
.is_incomplete())
1044 wr
->pg_stats
= pinfo
.stats
; // reflects backfill progress
1046 wr
->pg_stats
= get_info().stats
;
1048 wr
->pg_trim_to
= pg_trim_to
;
1049 wr
->pg_roll_forward_to
= pg_roll_forward_to
;
1051 wr
->new_temp_oid
= new_temp_oid
;
1052 wr
->discard_temp_oid
= discard_temp_oid
;
1053 wr
->updated_hit_set_history
= hset_hist
;
1057 void ReplicatedBackend::issue_op(
1058 const hobject_t
&soid
,
1059 const eversion_t
&at_version
,
1062 eversion_t pg_trim_to
,
1063 eversion_t pg_roll_forward_to
,
1064 hobject_t new_temp_oid
,
1065 hobject_t discard_temp_oid
,
1066 const vector
<pg_log_entry_t
> &log_entries
,
1067 boost::optional
<pg_hit_set_history_t
> &hset_hist
,
1069 ObjectStore::Transaction
&op_t
)
1072 op
->op
->pg_trace
.event("issue replication ops");
1074 if (parent
->get_actingbackfill_shards().size() > 1) {
1076 set
<pg_shard_t
> replicas
= parent
->get_actingbackfill_shards();
1077 replicas
.erase(parent
->whoami_shard());
1078 ss
<< "waiting for subops from " << replicas
;
1080 op
->op
->mark_sub_op_sent(ss
.str());
1082 for (set
<pg_shard_t
>::const_iterator i
=
1083 parent
->get_actingbackfill_shards().begin();
1084 i
!= parent
->get_actingbackfill_shards().end();
1086 if (*i
== parent
->whoami_shard()) continue;
1087 pg_shard_t peer
= *i
;
1088 const pg_info_t
&pinfo
= parent
->get_shard_info().find(peer
)->second
;
1091 wr
= generate_subop(
1106 wr
->trace
.init("replicated op", nullptr, &op
->op
->pg_trace
);
1107 get_parent()->send_message_osd_cluster(
1108 peer
.osd
, wr
, get_osdmap()->get_epoch());
1113 void ReplicatedBackend::do_repop(OpRequestRef op
)
1115 static_cast<MOSDRepOp
*>(op
->get_nonconst_req())->finish_decode();
1116 const MOSDRepOp
*m
= static_cast<const MOSDRepOp
*>(op
->get_req());
1117 int msg_type
= m
->get_type();
1118 assert(MSG_OSD_REPOP
== msg_type
);
1120 const hobject_t
& soid
= m
->poid
;
1122 dout(10) << __func__
<< " " << soid
1123 << " v " << m
->version
1124 << (m
->logbl
.length() ? " (transaction)" : " (parallel exec")
1125 << " " << m
->logbl
.length()
1129 assert(m
->map_epoch
>= get_info().history
.same_interval_since
);
1131 // we better not be missing this.
1132 assert(!parent
->get_log().get_missing().is_missing(soid
));
1134 parent
->maybe_preempt_replica_scrub(soid
);
1136 int ackerosd
= m
->get_source().num();
1140 RepModifyRef
rm(std::make_shared
<RepModify
>());
1142 rm
->ackerosd
= ackerosd
;
1143 rm
->last_complete
= get_info().last_complete
;
1144 rm
->epoch_started
= get_osdmap()->get_epoch();
1146 assert(m
->logbl
.length());
1147 // shipped transaction and log entries
1148 vector
<pg_log_entry_t
> log
;
1150 bufferlist::iterator p
= const_cast<bufferlist
&>(m
->get_data()).begin();
1151 ::decode(rm
->opt
, p
);
1153 if (m
->new_temp_oid
!= hobject_t()) {
1154 dout(20) << __func__
<< " start tracking temp " << m
->new_temp_oid
<< dendl
;
1155 add_temp_obj(m
->new_temp_oid
);
1157 if (m
->discard_temp_oid
!= hobject_t()) {
1158 dout(20) << __func__
<< " stop tracking temp " << m
->discard_temp_oid
<< dendl
;
1159 if (rm
->opt
.empty()) {
1160 dout(10) << __func__
<< ": removing object " << m
->discard_temp_oid
1161 << " since we won't get the transaction" << dendl
;
1162 rm
->localt
.remove(coll
, ghobject_t(m
->discard_temp_oid
));
1164 clear_temp_obj(m
->discard_temp_oid
);
1167 p
= const_cast<bufferlist
&>(m
->logbl
).begin();
1169 rm
->opt
.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
1171 bool update_snaps
= false;
1172 if (!rm
->opt
.empty()) {
1173 // If the opt is non-empty, we infer we are before
1174 // last_backfill (according to the primary, not our
1175 // not-quite-accurate value), and should update the
1176 // collections now. Otherwise, we do it later on push.
1177 update_snaps
= true;
1179 parent
->update_stats(m
->pg_stats
);
1180 parent
->log_operation(
1182 m
->updated_hit_set_history
,
1184 m
->pg_roll_forward_to
,
1188 rm
->opt
.register_on_commit(
1189 parent
->bless_context(
1190 new C_OSD_RepModifyCommit(this, rm
)));
1191 rm
->localt
.register_on_applied(
1192 parent
->bless_context(
1193 new C_OSD_RepModifyApply(this, rm
)));
1194 vector
<ObjectStore::Transaction
> tls
;
1196 tls
.push_back(std::move(rm
->localt
));
1197 tls
.push_back(std::move(rm
->opt
));
1198 parent
->queue_transactions(tls
, op
);
1199 // op is cleaned up by oncommit/onapply when both are executed
1202 void ReplicatedBackend::repop_applied(RepModifyRef rm
)
1204 rm
->op
->mark_event("sub_op_applied");
1206 rm
->op
->pg_trace
.event("sup_op_applied");
1208 dout(10) << __func__
<< " on " << rm
<< " op "
1209 << *rm
->op
->get_req() << dendl
;
1210 const Message
*m
= rm
->op
->get_req();
1211 const MOSDRepOp
*req
= static_cast<const MOSDRepOp
*>(m
);
1212 eversion_t version
= req
->version
;
1214 // send ack to acker only if we haven't sent a commit already
1215 if (!rm
->committed
) {
1216 Message
*ack
= new MOSDRepOpReply(
1217 req
, parent
->whoami_shard(),
1218 0, get_osdmap()->get_epoch(), req
->min_epoch
, CEPH_OSD_FLAG_ACK
);
1219 ack
->set_priority(CEPH_MSG_PRIO_HIGH
); // this better match commit priority!
1220 ack
->trace
= rm
->op
->pg_trace
;
1221 get_parent()->send_message_osd_cluster(
1222 rm
->ackerosd
, ack
, get_osdmap()->get_epoch());
1225 parent
->op_applied(version
);
1228 void ReplicatedBackend::repop_commit(RepModifyRef rm
)
1230 rm
->op
->mark_commit_sent();
1231 rm
->op
->pg_trace
.event("sup_op_commit");
1232 rm
->committed
= true;
1235 const MOSDRepOp
*m
= static_cast<const MOSDRepOp
*>(rm
->op
->get_req());
1236 assert(m
->get_type() == MSG_OSD_REPOP
);
1237 dout(10) << __func__
<< " on op " << *m
1238 << ", sending commit to osd." << rm
->ackerosd
1240 assert(get_osdmap()->is_up(rm
->ackerosd
));
1242 get_parent()->update_last_complete_ondisk(rm
->last_complete
);
1244 MOSDRepOpReply
*reply
= new MOSDRepOpReply(
1246 get_parent()->whoami_shard(),
1247 0, get_osdmap()->get_epoch(), m
->get_min_epoch(), CEPH_OSD_FLAG_ONDISK
);
1248 reply
->set_last_complete_ondisk(rm
->last_complete
);
1249 reply
->set_priority(CEPH_MSG_PRIO_HIGH
); // this better match ack priority!
1250 reply
->trace
= rm
->op
->pg_trace
;
1251 get_parent()->send_message_osd_cluster(
1252 rm
->ackerosd
, reply
, get_osdmap()->get_epoch());
1254 log_subop_stats(get_parent()->get_logger(), rm
->op
, l_osd_sop_w
);
1258 // ===========================================================
1260 void ReplicatedBackend::calc_head_subsets(
1261 ObjectContextRef obc
, SnapSet
& snapset
, const hobject_t
& head
,
1262 const pg_missing_t
& missing
,
1263 const hobject_t
&last_backfill
,
1264 interval_set
<uint64_t>& data_subset
,
1265 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1266 ObcLockManager
&manager
)
1268 dout(10) << "calc_head_subsets " << head
1269 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1271 uint64_t size
= obc
->obs
.oi
.size
;
1273 data_subset
.insert(0, size
);
1275 if (get_parent()->get_pool().allow_incomplete_clones()) {
1276 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1280 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1281 dout(10) << "calc_head_subsets " << head
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1286 interval_set
<uint64_t> cloning
;
1287 interval_set
<uint64_t> prev
;
1289 prev
.insert(0, size
);
1291 for (int j
=snapset
.clones
.size()-1; j
>=0; j
--) {
1293 c
.snap
= snapset
.clones
[j
];
1294 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1295 if (!missing
.is_missing(c
) &&
1296 c
< last_backfill
&&
1297 get_parent()->try_lock_for_read(c
, manager
)) {
1298 dout(10) << "calc_head_subsets " << head
<< " has prev " << c
1299 << " overlap " << prev
<< dendl
;
1300 clone_subsets
[c
] = prev
;
1301 cloning
.union_of(prev
);
1304 dout(10) << "calc_head_subsets " << head
<< " does not have prev " << c
1305 << " overlap " << prev
<< dendl
;
1309 if (cloning
.num_intervals() > cct
->_conf
->osd_recover_clone_overlap_limit
) {
1310 dout(10) << "skipping clone, too many holes" << dendl
;
1311 get_parent()->release_locks(manager
);
1312 clone_subsets
.clear();
1316 // what's left for us to push?
1317 data_subset
.subtract(cloning
);
1319 dout(10) << "calc_head_subsets " << head
1320 << " data_subset " << data_subset
1321 << " clone_subsets " << clone_subsets
<< dendl
;
1324 void ReplicatedBackend::calc_clone_subsets(
1325 SnapSet
& snapset
, const hobject_t
& soid
,
1326 const pg_missing_t
& missing
,
1327 const hobject_t
&last_backfill
,
1328 interval_set
<uint64_t>& data_subset
,
1329 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1330 ObcLockManager
&manager
)
1332 dout(10) << "calc_clone_subsets " << soid
1333 << " clone_overlap " << snapset
.clone_overlap
<< dendl
;
1335 uint64_t size
= snapset
.clone_size
[soid
.snap
];
1337 data_subset
.insert(0, size
);
1339 if (get_parent()->get_pool().allow_incomplete_clones()) {
1340 dout(10) << __func__
<< ": caching (was) enabled, skipping clone subsets" << dendl
;
1344 if (!cct
->_conf
->osd_recover_clone_overlap
) {
1345 dout(10) << "calc_clone_subsets " << soid
<< " -- osd_recover_clone_overlap disabled" << dendl
;
1350 for (i
=0; i
< snapset
.clones
.size(); i
++)
1351 if (snapset
.clones
[i
] == soid
.snap
)
1354 // any overlap with next older clone?
1355 interval_set
<uint64_t> cloning
;
1356 interval_set
<uint64_t> prev
;
1358 prev
.insert(0, size
);
1359 for (int j
=i
-1; j
>=0; j
--) {
1361 c
.snap
= snapset
.clones
[j
];
1362 prev
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
]]);
1363 if (!missing
.is_missing(c
) &&
1364 c
< last_backfill
&&
1365 get_parent()->try_lock_for_read(c
, manager
)) {
1366 dout(10) << "calc_clone_subsets " << soid
<< " has prev " << c
1367 << " overlap " << prev
<< dendl
;
1368 clone_subsets
[c
] = prev
;
1369 cloning
.union_of(prev
);
1372 dout(10) << "calc_clone_subsets " << soid
<< " does not have prev " << c
1373 << " overlap " << prev
<< dendl
;
1376 // overlap with next newest?
1377 interval_set
<uint64_t> next
;
1379 next
.insert(0, size
);
1380 for (unsigned j
=i
+1; j
<snapset
.clones
.size(); j
++) {
1382 c
.snap
= snapset
.clones
[j
];
1383 next
.intersection_of(snapset
.clone_overlap
[snapset
.clones
[j
-1]]);
1384 if (!missing
.is_missing(c
) &&
1385 c
< last_backfill
&&
1386 get_parent()->try_lock_for_read(c
, manager
)) {
1387 dout(10) << "calc_clone_subsets " << soid
<< " has next " << c
1388 << " overlap " << next
<< dendl
;
1389 clone_subsets
[c
] = next
;
1390 cloning
.union_of(next
);
1393 dout(10) << "calc_clone_subsets " << soid
<< " does not have next " << c
1394 << " overlap " << next
<< dendl
;
1397 if (cloning
.num_intervals() > cct
->_conf
->osd_recover_clone_overlap_limit
) {
1398 dout(10) << "skipping clone, too many holes" << dendl
;
1399 get_parent()->release_locks(manager
);
1400 clone_subsets
.clear();
1405 // what's left for us to push?
1406 data_subset
.subtract(cloning
);
1408 dout(10) << "calc_clone_subsets " << soid
1409 << " data_subset " << data_subset
1410 << " clone_subsets " << clone_subsets
<< dendl
;
1413 void ReplicatedBackend::prepare_pull(
1415 const hobject_t
& soid
,
1416 ObjectContextRef headctx
,
1419 assert(get_parent()->get_local_missing().get_items().count(soid
));
1420 eversion_t _v
= get_parent()->get_local_missing().get_items().find(
1423 const map
<hobject_t
, set
<pg_shard_t
>> &missing_loc(
1424 get_parent()->get_missing_loc_shards());
1425 const map
<pg_shard_t
, pg_missing_t
> &peer_missing(
1426 get_parent()->get_shard_missing());
1427 map
<hobject_t
, set
<pg_shard_t
>>::const_iterator q
= missing_loc
.find(soid
);
1428 assert(q
!= missing_loc
.end());
1429 assert(!q
->second
.empty());
1432 vector
<pg_shard_t
> shuffle(q
->second
.begin(), q
->second
.end());
1433 random_shuffle(shuffle
.begin(), shuffle
.end());
1434 vector
<pg_shard_t
>::iterator p
= shuffle
.begin();
1435 assert(get_osdmap()->is_up(p
->osd
));
1436 pg_shard_t fromshard
= *p
;
1438 dout(7) << "pull " << soid
1440 << " on osds " << q
->second
1441 << " from osd." << fromshard
1444 assert(peer_missing
.count(fromshard
));
1445 const pg_missing_t
&pmissing
= peer_missing
.find(fromshard
)->second
;
1446 if (pmissing
.is_missing(soid
, v
)) {
1447 assert(pmissing
.get_items().find(soid
)->second
.have
!= v
);
1448 dout(10) << "pulling soid " << soid
<< " from osd " << fromshard
1449 << " at version " << pmissing
.get_items().find(soid
)->second
.have
1450 << " rather than at version " << v
<< dendl
;
1451 v
= pmissing
.get_items().find(soid
)->second
.have
;
1452 assert(get_parent()->get_log().get_log().objects
.count(soid
) &&
1453 (get_parent()->get_log().get_log().objects
.find(soid
)->second
->op
==
1454 pg_log_entry_t::LOST_REVERT
) &&
1455 (get_parent()->get_log().get_log().objects
.find(
1456 soid
)->second
->reverting_to
==
1460 ObjectRecoveryInfo recovery_info
;
1461 ObcLockManager lock_manager
;
1463 if (soid
.is_snap()) {
1464 assert(!get_parent()->get_local_missing().is_missing(
1466 !get_parent()->get_local_missing().is_missing(
1467 soid
.get_snapdir()));
1470 SnapSetContext
*ssc
= headctx
->ssc
;
1472 dout(10) << " snapset " << ssc
->snapset
<< dendl
;
1473 recovery_info
.ss
= ssc
->snapset
;
1475 ssc
->snapset
, soid
, get_parent()->get_local_missing(),
1476 get_info().last_backfill
,
1477 recovery_info
.copy_subset
,
1478 recovery_info
.clone_subset
,
1480 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1481 dout(10) << " pulling " << recovery_info
<< dendl
;
1483 assert(ssc
->snapset
.clone_size
.count(soid
.snap
));
1484 recovery_info
.size
= ssc
->snapset
.clone_size
[soid
.snap
];
1486 // pulling head or unversioned object.
1487 // always pull the whole thing.
1488 recovery_info
.copy_subset
.insert(0, (uint64_t)-1);
1489 recovery_info
.size
= ((uint64_t)-1);
1492 h
->pulls
[fromshard
].push_back(PullOp());
1493 PullOp
&op
= h
->pulls
[fromshard
].back();
1496 op
.recovery_info
= recovery_info
;
1497 op
.recovery_info
.soid
= soid
;
1498 op
.recovery_info
.version
= v
;
1499 op
.recovery_progress
.data_complete
= false;
1500 op
.recovery_progress
.omap_complete
= false;
1501 op
.recovery_progress
.data_recovered_to
= 0;
1502 op
.recovery_progress
.first
= true;
1504 assert(!pulling
.count(soid
));
1505 pull_from_peer
[fromshard
].insert(soid
);
1506 PullInfo
&pi
= pulling
[soid
];
1507 pi
.from
= fromshard
;
1509 pi
.head_ctx
= headctx
;
1510 pi
.recovery_info
= op
.recovery_info
;
1511 pi
.recovery_progress
= op
.recovery_progress
;
1512 pi
.cache_dont_need
= h
->cache_dont_need
;
1513 pi
.lock_manager
= std::move(lock_manager
);
1517 * intelligently push an object to a replica. make use of existing
1518 * clones/heads and dup data ranges where possible.
1520 int ReplicatedBackend::prep_push_to_replica(
1521 ObjectContextRef obc
, const hobject_t
& soid
, pg_shard_t peer
,
1522 PushOp
*pop
, bool cache_dont_need
)
1524 const object_info_t
& oi
= obc
->obs
.oi
;
1525 uint64_t size
= obc
->obs
.oi
.size
;
1527 dout(10) << __func__
<< ": " << soid
<< " v" << oi
.version
1528 << " size " << size
<< " to osd." << peer
<< dendl
;
1530 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1531 interval_set
<uint64_t> data_subset
;
1533 ObcLockManager lock_manager
;
1534 // are we doing a clone on the replica?
1535 if (soid
.snap
&& soid
.snap
< CEPH_NOSNAP
) {
1536 hobject_t head
= soid
;
1537 head
.snap
= CEPH_NOSNAP
;
1539 // try to base push off of clones that succeed/preceed poid
1540 // we need the head (and current SnapSet) locally to do that.
1541 if (get_parent()->get_local_missing().is_missing(head
)) {
1542 dout(15) << "push_to_replica missing head " << head
<< ", pushing raw clone" << dendl
;
1543 return prep_push(obc
, soid
, peer
, pop
, cache_dont_need
);
1545 hobject_t snapdir
= head
;
1546 snapdir
.snap
= CEPH_SNAPDIR
;
1547 if (get_parent()->get_local_missing().is_missing(snapdir
)) {
1548 dout(15) << "push_to_replica missing snapdir " << snapdir
1549 << ", pushing raw clone" << dendl
;
1550 return prep_push(obc
, soid
, peer
, pop
, cache_dont_need
);
1553 SnapSetContext
*ssc
= obc
->ssc
;
1555 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1556 pop
->recovery_info
.ss
= ssc
->snapset
;
1557 map
<pg_shard_t
, pg_missing_t
>::const_iterator pm
=
1558 get_parent()->get_shard_missing().find(peer
);
1559 assert(pm
!= get_parent()->get_shard_missing().end());
1560 map
<pg_shard_t
, pg_info_t
>::const_iterator pi
=
1561 get_parent()->get_shard_info().find(peer
);
1562 assert(pi
!= get_parent()->get_shard_info().end());
1566 pi
->second
.last_backfill
,
1567 data_subset
, clone_subsets
,
1569 } else if (soid
.snap
== CEPH_NOSNAP
) {
1570 // pushing head or unversioned object.
1571 // base this on partially on replica's clones?
1572 SnapSetContext
*ssc
= obc
->ssc
;
1574 dout(15) << "push_to_replica snapset is " << ssc
->snapset
<< dendl
;
1577 ssc
->snapset
, soid
, get_parent()->get_shard_missing().find(peer
)->second
,
1578 get_parent()->get_shard_info().find(peer
)->second
.last_backfill
,
1579 data_subset
, clone_subsets
,
1592 std::move(lock_manager
));
1595 int ReplicatedBackend::prep_push(ObjectContextRef obc
,
1596 const hobject_t
& soid
, pg_shard_t peer
,
1597 PushOp
*pop
, bool cache_dont_need
)
1599 interval_set
<uint64_t> data_subset
;
1600 if (obc
->obs
.oi
.size
)
1601 data_subset
.insert(0, obc
->obs
.oi
.size
);
1602 map
<hobject_t
, interval_set
<uint64_t>> clone_subsets
;
1604 return prep_push(obc
, soid
, peer
,
1605 obc
->obs
.oi
.version
, data_subset
, clone_subsets
,
1606 pop
, cache_dont_need
, ObcLockManager());
1609 int ReplicatedBackend::prep_push(
1610 ObjectContextRef obc
,
1611 const hobject_t
& soid
, pg_shard_t peer
,
1613 interval_set
<uint64_t> &data_subset
,
1614 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
1616 bool cache_dont_need
,
1617 ObcLockManager
&&lock_manager
)
1619 get_parent()->begin_peer_recover(peer
, soid
);
1621 PushInfo
&pi
= pushing
[soid
][peer
];
1623 pi
.recovery_info
.size
= obc
->obs
.oi
.size
;
1624 pi
.recovery_info
.copy_subset
= data_subset
;
1625 pi
.recovery_info
.clone_subset
= clone_subsets
;
1626 pi
.recovery_info
.soid
= soid
;
1627 pi
.recovery_info
.oi
= obc
->obs
.oi
;
1628 pi
.recovery_info
.ss
= pop
->recovery_info
.ss
;
1629 pi
.recovery_info
.version
= version
;
1630 pi
.lock_manager
= std::move(lock_manager
);
1632 ObjectRecoveryProgress new_progress
;
1633 int r
= build_push_op(pi
.recovery_info
,
1634 pi
.recovery_progress
,
1637 &(pi
.stat
), cache_dont_need
);
1640 pi
.recovery_progress
= new_progress
;
1644 void ReplicatedBackend::submit_push_data(
1645 const ObjectRecoveryInfo
&recovery_info
,
1648 bool cache_dont_need
,
1649 const interval_set
<uint64_t> &intervals_included
,
1650 bufferlist data_included
,
1651 bufferlist omap_header
,
1652 const map
<string
, bufferlist
> &attrs
,
1653 const map
<string
, bufferlist
> &omap_entries
,
1654 ObjectStore::Transaction
*t
)
1656 hobject_t target_oid
;
1657 if (first
&& complete
) {
1658 target_oid
= recovery_info
.soid
;
1660 target_oid
= get_parent()->get_temp_recovery_object(recovery_info
.soid
,
1661 recovery_info
.version
);
1663 dout(10) << __func__
<< ": Adding oid "
1664 << target_oid
<< " in the temp collection" << dendl
;
1665 add_temp_obj(target_oid
);
1670 t
->remove(coll
, ghobject_t(target_oid
));
1671 t
->touch(coll
, ghobject_t(target_oid
));
1672 t
->truncate(coll
, ghobject_t(target_oid
), recovery_info
.size
);
1673 if (omap_header
.length())
1674 t
->omap_setheader(coll
, ghobject_t(target_oid
), omap_header
);
1676 bufferlist bv
= attrs
.at(OI_ATTR
);
1677 object_info_t
oi(bv
);
1678 t
->set_alloc_hint(coll
, ghobject_t(target_oid
),
1679 oi
.expected_object_size
,
1680 oi
.expected_write_size
,
1681 oi
.alloc_hint_flags
);
1684 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
;
1685 if (cache_dont_need
)
1686 fadvise_flags
|= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
1687 for (interval_set
<uint64_t>::const_iterator p
= intervals_included
.begin();
1688 p
!= intervals_included
.end();
1691 bit
.substr_of(data_included
, off
, p
.get_len());
1692 t
->write(coll
, ghobject_t(target_oid
),
1693 p
.get_start(), p
.get_len(), bit
, fadvise_flags
);
1697 if (!omap_entries
.empty())
1698 t
->omap_setkeys(coll
, ghobject_t(target_oid
), omap_entries
);
1700 t
->setattrs(coll
, ghobject_t(target_oid
), attrs
);
1704 dout(10) << __func__
<< ": Removing oid "
1705 << target_oid
<< " from the temp collection" << dendl
;
1706 clear_temp_obj(target_oid
);
1707 t
->remove(coll
, ghobject_t(recovery_info
.soid
));
1708 t
->collection_move_rename(coll
, ghobject_t(target_oid
),
1709 coll
, ghobject_t(recovery_info
.soid
));
1712 submit_push_complete(recovery_info
, t
);
1716 void ReplicatedBackend::submit_push_complete(
1717 const ObjectRecoveryInfo
&recovery_info
,
1718 ObjectStore::Transaction
*t
)
1720 for (map
<hobject_t
, interval_set
<uint64_t>>::const_iterator p
=
1721 recovery_info
.clone_subset
.begin();
1722 p
!= recovery_info
.clone_subset
.end();
1724 for (interval_set
<uint64_t>::const_iterator q
= p
->second
.begin();
1725 q
!= p
->second
.end();
1727 dout(15) << " clone_range " << p
->first
<< " "
1728 << q
.get_start() << "~" << q
.get_len() << dendl
;
1729 t
->clone_range(coll
, ghobject_t(p
->first
), ghobject_t(recovery_info
.soid
),
1730 q
.get_start(), q
.get_len(), q
.get_start());
1735 ObjectRecoveryInfo
ReplicatedBackend::recalc_subsets(
1736 const ObjectRecoveryInfo
& recovery_info
,
1737 SnapSetContext
*ssc
,
1738 ObcLockManager
&manager
)
1740 if (!recovery_info
.soid
.snap
|| recovery_info
.soid
.snap
>= CEPH_NOSNAP
)
1741 return recovery_info
;
1742 ObjectRecoveryInfo new_info
= recovery_info
;
1743 new_info
.copy_subset
.clear();
1744 new_info
.clone_subset
.clear();
1746 get_parent()->release_locks(manager
); // might already have locks
1748 ssc
->snapset
, new_info
.soid
, get_parent()->get_local_missing(),
1749 get_info().last_backfill
,
1750 new_info
.copy_subset
, new_info
.clone_subset
,
1755 bool ReplicatedBackend::handle_pull_response(
1756 pg_shard_t from
, const PushOp
&pop
, PullOp
*response
,
1757 list
<pull_complete_info
> *to_continue
,
1758 ObjectStore::Transaction
*t
)
1760 interval_set
<uint64_t> data_included
= pop
.data_included
;
1763 dout(10) << "handle_pull_response "
1764 << pop
.recovery_info
1765 << pop
.after_progress
1766 << " data.size() is " << data
.length()
1767 << " data_included: " << data_included
1769 if (pop
.version
== eversion_t()) {
1770 // replica doesn't have it!
1771 _failed_pull(from
, pop
.soid
);
1775 const hobject_t
&hoid
= pop
.soid
;
1776 assert((data_included
.empty() && data
.length() == 0) ||
1777 (!data_included
.empty() && data
.length() > 0));
1779 auto piter
= pulling
.find(hoid
);
1780 if (piter
== pulling
.end()) {
1784 PullInfo
&pi
= piter
->second
;
1785 if (pi
.recovery_info
.size
== (uint64_t(-1))) {
1786 pi
.recovery_info
.size
= pop
.recovery_info
.size
;
1787 pi
.recovery_info
.copy_subset
.intersection_of(
1788 pop
.recovery_info
.copy_subset
);
1790 // If primary doesn't have object info and didn't know version
1791 if (pi
.recovery_info
.version
== eversion_t()) {
1792 pi
.recovery_info
.version
= pop
.version
;
1795 bool first
= pi
.recovery_progress
.first
;
1797 // attrs only reference the origin bufferlist (decode from
1798 // MOSDPGPush message) whose size is much greater than attrs in
1799 // recovery. If obc cache it (get_obc maybe cache the attr), this
1800 // causes the whole origin bufferlist would not be free until obc
1801 // is evicted from obc cache. So rebuild the bufferlists before
1803 auto attrset
= pop
.attrset
;
1804 for (auto& a
: attrset
) {
1807 pi
.obc
= get_parent()->get_obc(pi
.recovery_info
.soid
, attrset
);
1808 pi
.recovery_info
.oi
= pi
.obc
->obs
.oi
;
1809 pi
.recovery_info
= recalc_subsets(
1816 interval_set
<uint64_t> usable_intervals
;
1817 bufferlist usable_data
;
1818 trim_pushed_data(pi
.recovery_info
.copy_subset
,
1823 data_included
= usable_intervals
;
1824 data
.claim(usable_data
);
1827 pi
.recovery_progress
= pop
.after_progress
;
1829 dout(10) << "new recovery_info " << pi
.recovery_info
1830 << ", new progress " << pi
.recovery_progress
1833 bool complete
= pi
.is_complete();
1835 submit_push_data(pi
.recovery_info
, first
,
1836 complete
, pi
.cache_dont_need
,
1837 data_included
, data
,
1843 pi
.stat
.num_keys_recovered
+= pop
.omap_entries
.size();
1844 pi
.stat
.num_bytes_recovered
+= data
.length();
1847 pi
.stat
.num_objects_recovered
++;
1848 clear_pull_from(piter
);
1849 to_continue
->push_back({hoid
, pi
.stat
});
1850 get_parent()->on_local_recover(
1851 hoid
, pi
.recovery_info
, pi
.obc
, false, t
);
1854 response
->soid
= pop
.soid
;
1855 response
->recovery_info
= pi
.recovery_info
;
1856 response
->recovery_progress
= pi
.recovery_progress
;
1861 void ReplicatedBackend::handle_push(
1862 pg_shard_t from
, const PushOp
&pop
, PushReplyOp
*response
,
1863 ObjectStore::Transaction
*t
)
1865 dout(10) << "handle_push "
1866 << pop
.recovery_info
1867 << pop
.after_progress
1871 bool first
= pop
.before_progress
.first
;
1872 bool complete
= pop
.after_progress
.data_complete
&&
1873 pop
.after_progress
.omap_complete
;
1875 response
->soid
= pop
.recovery_info
.soid
;
1876 submit_push_data(pop
.recovery_info
,
1879 true, // must be replicate
1888 get_parent()->on_local_recover(
1889 pop
.recovery_info
.soid
,
1891 ObjectContextRef(), // ok, is replica
1896 void ReplicatedBackend::send_pushes(int prio
, map
<pg_shard_t
, vector
<PushOp
> > &pushes
)
1898 for (map
<pg_shard_t
, vector
<PushOp
> >::iterator i
= pushes
.begin();
1901 ConnectionRef con
= get_parent()->get_con_osd_cluster(
1903 get_osdmap()->get_epoch());
1906 vector
<PushOp
>::iterator j
= i
->second
.begin();
1907 while (j
!= i
->second
.end()) {
1909 uint64_t pushes
= 0;
1910 MOSDPGPush
*msg
= new MOSDPGPush();
1911 msg
->from
= get_parent()->whoami_shard();
1912 msg
->pgid
= get_parent()->primary_spg_t();
1913 msg
->map_epoch
= get_osdmap()->get_epoch();
1914 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
1915 msg
->set_priority(prio
);
1917 (j
!= i
->second
.end() &&
1918 cost
< cct
->_conf
->osd_max_push_cost
&&
1919 pushes
< cct
->_conf
->osd_max_push_objects
) ;
1921 dout(20) << __func__
<< ": sending push " << *j
1922 << " to osd." << i
->first
<< dendl
;
1923 cost
+= j
->cost(cct
);
1925 msg
->pushes
.push_back(*j
);
1927 msg
->set_cost(cost
);
1928 get_parent()->send_message_osd_cluster(msg
, con
);
1933 void ReplicatedBackend::send_pulls(int prio
, map
<pg_shard_t
, vector
<PullOp
> > &pulls
)
1935 for (map
<pg_shard_t
, vector
<PullOp
> >::iterator i
= pulls
.begin();
1938 ConnectionRef con
= get_parent()->get_con_osd_cluster(
1940 get_osdmap()->get_epoch());
1943 dout(20) << __func__
<< ": sending pulls " << i
->second
1944 << " to osd." << i
->first
<< dendl
;
1945 MOSDPGPull
*msg
= new MOSDPGPull();
1946 msg
->from
= parent
->whoami_shard();
1947 msg
->set_priority(prio
);
1948 msg
->pgid
= get_parent()->primary_spg_t();
1949 msg
->map_epoch
= get_osdmap()->get_epoch();
1950 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
1951 msg
->set_pulls(&i
->second
);
1952 msg
->compute_cost(cct
);
1953 get_parent()->send_message_osd_cluster(msg
, con
);
1957 int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo
&recovery_info
,
1958 const ObjectRecoveryProgress
&progress
,
1959 ObjectRecoveryProgress
*out_progress
,
1961 object_stat_sum_t
*stat
,
1962 bool cache_dont_need
)
1964 ObjectRecoveryProgress _new_progress
;
1966 out_progress
= &_new_progress
;
1967 ObjectRecoveryProgress
&new_progress
= *out_progress
;
1968 new_progress
= progress
;
1970 dout(7) << __func__
<< " " << recovery_info
.soid
1971 << " v " << recovery_info
.version
1972 << " size " << recovery_info
.size
1973 << " recovery_info: " << recovery_info
1976 eversion_t v
= recovery_info
.version
;
1977 if (progress
.first
) {
1978 int r
= store
->omap_get_header(coll
, ghobject_t(recovery_info
.soid
), &out_op
->omap_header
);
1980 dout(1) << __func__
<< " get omap header failed: " << cpp_strerror(-r
) << dendl
;
1983 r
= store
->getattrs(ch
, ghobject_t(recovery_info
.soid
), out_op
->attrset
);
1985 dout(1) << __func__
<< " getattrs failed: " << cpp_strerror(-r
) << dendl
;
1990 bufferlist bv
= out_op
->attrset
[OI_ATTR
];
1993 bufferlist::iterator bliter
= bv
.begin();
1994 ::decode(oi
, bliter
);
1996 dout(0) << __func__
<< ": bad object_info_t: " << recovery_info
.soid
<< dendl
;
2000 // If requestor didn't know the version, use ours
2001 if (v
== eversion_t()) {
2003 } else if (oi
.version
!= v
) {
2004 get_parent()->clog_error() << get_info().pgid
<< " push "
2005 << recovery_info
.soid
<< " v "
2006 << recovery_info
.version
2007 << " failed because local copy is "
2012 new_progress
.first
= false;
2014 // Once we provide the version subsequent requests will have it, so
2015 // at this point it must be known.
2016 assert(v
!= eversion_t());
2018 uint64_t available
= cct
->_conf
->osd_recovery_max_chunk
;
2019 if (!progress
.omap_complete
) {
2020 ObjectMap::ObjectMapIterator iter
=
2021 store
->get_omap_iterator(coll
,
2022 ghobject_t(recovery_info
.soid
));
2024 for (iter
->lower_bound(progress
.omap_recovered_to
);
2026 iter
->next(false)) {
2027 if (!out_op
->omap_entries
.empty() &&
2028 ((cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
> 0 &&
2029 out_op
->omap_entries
.size() >= cct
->_conf
->osd_recovery_max_omap_entries_per_chunk
) ||
2030 available
<= iter
->key().size() + iter
->value().length()))
2032 out_op
->omap_entries
.insert(make_pair(iter
->key(), iter
->value()));
2034 if ((iter
->key().size() + iter
->value().length()) <= available
)
2035 available
-= (iter
->key().size() + iter
->value().length());
2040 new_progress
.omap_complete
= true;
2042 new_progress
.omap_recovered_to
= iter
->key();
2045 if (available
> 0) {
2046 if (!recovery_info
.copy_subset
.empty()) {
2047 interval_set
<uint64_t> copy_subset
= recovery_info
.copy_subset
;
2048 map
<uint64_t, uint64_t> m
;
2049 int r
= store
->fiemap(ch
, ghobject_t(recovery_info
.soid
), 0,
2050 copy_subset
.range_end(), m
);
2052 interval_set
<uint64_t> fiemap_included(m
);
2053 copy_subset
.intersection_of(fiemap_included
);
2055 // intersection of copy_subset and empty interval_set would be empty anyway
2056 copy_subset
.clear();
2059 out_op
->data_included
.span_of(copy_subset
, progress
.data_recovered_to
,
2061 if (out_op
->data_included
.empty()) // zero filled section, skip to end!
2062 new_progress
.data_recovered_to
= recovery_info
.copy_subset
.range_end();
2064 new_progress
.data_recovered_to
= out_op
->data_included
.range_end();
2067 out_op
->data_included
.clear();
2070 for (interval_set
<uint64_t>::iterator p
= out_op
->data_included
.begin();
2071 p
!= out_op
->data_included
.end();
2074 int r
= store
->read(ch
, ghobject_t(recovery_info
.soid
),
2075 p
.get_start(), p
.get_len(), bit
,
2076 cache_dont_need
? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
: 0);
2077 if (cct
->_conf
->osd_debug_random_push_read_error
&&
2078 (rand() % (int)(cct
->_conf
->osd_debug_random_push_read_error
* 100.0)) == 0) {
2079 dout(0) << __func__
<< ": inject EIO " << recovery_info
.soid
<< dendl
;
2085 if (p
.get_len() != bit
.length()) {
2086 dout(10) << " extent " << p
.get_start() << "~" << p
.get_len()
2087 << " is actually " << p
.get_start() << "~" << bit
.length()
2089 interval_set
<uint64_t>::iterator save
= p
++;
2090 if (bit
.length() == 0)
2091 out_op
->data_included
.erase(save
); //Remove this empty interval
2093 save
.set_len(bit
.length());
2094 // Remove any other intervals present
2095 while (p
!= out_op
->data_included
.end()) {
2096 interval_set
<uint64_t>::iterator save
= p
++;
2097 out_op
->data_included
.erase(save
);
2099 new_progress
.data_complete
= true;
2100 out_op
->data
.claim_append(bit
);
2103 out_op
->data
.claim_append(bit
);
2106 if (new_progress
.is_complete(recovery_info
)) {
2107 new_progress
.data_complete
= true;
2109 stat
->num_objects_recovered
++;
2113 stat
->num_keys_recovered
+= out_op
->omap_entries
.size();
2114 stat
->num_bytes_recovered
+= out_op
->data
.length();
2117 get_parent()->get_logger()->inc(l_osd_push
);
2118 get_parent()->get_logger()->inc(l_osd_push_outb
, out_op
->data
.length());
2121 out_op
->version
= v
;
2122 out_op
->soid
= recovery_info
.soid
;
2123 out_op
->recovery_info
= recovery_info
;
2124 out_op
->after_progress
= new_progress
;
2125 out_op
->before_progress
= progress
;
2129 void ReplicatedBackend::prep_push_op_blank(const hobject_t
& soid
, PushOp
*op
)
2131 op
->recovery_info
.version
= eversion_t();
2132 op
->version
= eversion_t();
2136 bool ReplicatedBackend::handle_push_reply(
2137 pg_shard_t peer
, const PushReplyOp
&op
, PushOp
*reply
)
2139 const hobject_t
&soid
= op
.soid
;
2140 if (pushing
.count(soid
) == 0) {
2141 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2142 << ", or anybody else"
2145 } else if (pushing
[soid
].count(peer
) == 0) {
2146 dout(10) << "huh, i wasn't pushing " << soid
<< " to osd." << peer
2150 PushInfo
*pi
= &pushing
[soid
][peer
];
2151 bool error
= pushing
[soid
].begin()->second
.recovery_progress
.error
;
2153 if (!pi
->recovery_progress
.data_complete
&& !error
) {
2154 dout(10) << " pushing more from, "
2155 << pi
->recovery_progress
.data_recovered_to
2156 << " of " << pi
->recovery_info
.copy_subset
<< dendl
;
2157 ObjectRecoveryProgress new_progress
;
2158 int r
= build_push_op(
2160 pi
->recovery_progress
, &new_progress
, reply
,
2162 // Handle the case of a read error right after we wrote, which is
2163 // hopefuilly extremely rare.
2165 dout(5) << __func__
<< ": oid " << soid
<< " error " << r
<< dendl
;
2170 pi
->recovery_progress
= new_progress
;
2176 get_parent()->on_peer_recover( peer
, soid
, pi
->recovery_info
);
2178 get_parent()->release_locks(pi
->lock_manager
);
2179 object_stat_sum_t stat
= pi
->stat
;
2180 eversion_t v
= pi
->recovery_info
.version
;
2181 pushing
[soid
].erase(peer
);
2184 if (pushing
[soid
].empty()) {
2186 get_parent()->on_global_recover(soid
, stat
, false);
2188 get_parent()->on_primary_error(soid
, v
);
2189 pushing
.erase(soid
);
2191 // This looks weird, but we erased the current peer and need to remember
2192 // the error on any other one, while getting more acks.
2194 pushing
[soid
].begin()->second
.recovery_progress
.error
= true;
2195 dout(10) << "pushed " << soid
<< ", still waiting for push ack from "
2196 << pushing
[soid
].size() << " others" << dendl
;
2203 void ReplicatedBackend::handle_pull(pg_shard_t peer
, PullOp
&op
, PushOp
*reply
)
2205 const hobject_t
&soid
= op
.soid
;
2207 int r
= store
->stat(ch
, ghobject_t(soid
), &st
);
2209 get_parent()->clog_error() << get_info().pgid
<< " "
2210 << peer
<< " tried to pull " << soid
2211 << " but got " << cpp_strerror(-r
);
2212 prep_push_op_blank(soid
, reply
);
2214 ObjectRecoveryInfo
&recovery_info
= op
.recovery_info
;
2215 ObjectRecoveryProgress
&progress
= op
.recovery_progress
;
2216 if (progress
.first
&& recovery_info
.size
== ((uint64_t)-1)) {
2217 // Adjust size and copy_subset
2218 recovery_info
.size
= st
.st_size
;
2219 recovery_info
.copy_subset
.clear();
2221 recovery_info
.copy_subset
.insert(0, st
.st_size
);
2222 assert(recovery_info
.clone_subset
.empty());
2225 r
= build_push_op(recovery_info
, progress
, 0, reply
);
2227 prep_push_op_blank(soid
, reply
);
2232 * trim received data to remove what we don't want
2234 * @param copy_subset intervals we want
2235 * @param data_included intervals we got
2236 * @param data_recieved data we got
2237 * @param intervals_usable intervals we want to keep
2238 * @param data_usable matching data we want to keep
2240 void ReplicatedBackend::trim_pushed_data(
2241 const interval_set
<uint64_t> ©_subset
,
2242 const interval_set
<uint64_t> &intervals_received
,
2243 bufferlist data_received
,
2244 interval_set
<uint64_t> *intervals_usable
,
2245 bufferlist
*data_usable
)
2247 if (intervals_received
.subset_of(copy_subset
)) {
2248 *intervals_usable
= intervals_received
;
2249 *data_usable
= data_received
;
2253 intervals_usable
->intersection_of(copy_subset
,
2254 intervals_received
);
2257 for (interval_set
<uint64_t>::const_iterator p
= intervals_received
.begin();
2258 p
!= intervals_received
.end();
2260 interval_set
<uint64_t> x
;
2261 x
.insert(p
.get_start(), p
.get_len());
2262 x
.intersection_of(copy_subset
);
2263 for (interval_set
<uint64_t>::const_iterator q
= x
.begin();
2267 uint64_t data_off
= off
+ (q
.get_start() - p
.get_start());
2268 sub
.substr_of(data_received
, data_off
, q
.get_len());
2269 data_usable
->claim_append(sub
);
2275 void ReplicatedBackend::_failed_pull(pg_shard_t from
, const hobject_t
&soid
)
2277 dout(20) << __func__
<< ": " << soid
<< " from " << from
<< dendl
;
2278 list
<pg_shard_t
> fl
= { from
};
2279 get_parent()->failed_push(fl
, soid
);
2281 clear_pull(pulling
.find(soid
));
2284 void ReplicatedBackend::clear_pull_from(
2285 map
<hobject_t
, PullInfo
>::iterator piter
)
2287 auto from
= piter
->second
.from
;
2288 pull_from_peer
[from
].erase(piter
->second
.soid
);
2289 if (pull_from_peer
[from
].empty())
2290 pull_from_peer
.erase(from
);
2293 void ReplicatedBackend::clear_pull(
2294 map
<hobject_t
, PullInfo
>::iterator piter
,
2295 bool clear_pull_from_peer
)
2297 if (clear_pull_from_peer
) {
2298 clear_pull_from(piter
);
2300 get_parent()->release_locks(piter
->second
.lock_manager
);
2301 pulling
.erase(piter
);
2304 int ReplicatedBackend::start_pushes(
2305 const hobject_t
&soid
,
2306 ObjectContextRef obc
,
2309 list
< map
<pg_shard_t
, pg_missing_t
>::const_iterator
> shards
;
2311 dout(20) << __func__
<< " soid " << soid
<< dendl
;
2313 assert(get_parent()->get_actingbackfill_shards().size() > 0);
2314 for (set
<pg_shard_t
>::iterator i
=
2315 get_parent()->get_actingbackfill_shards().begin();
2316 i
!= get_parent()->get_actingbackfill_shards().end();
2318 if (*i
== get_parent()->whoami_shard()) continue;
2319 pg_shard_t peer
= *i
;
2320 map
<pg_shard_t
, pg_missing_t
>::const_iterator j
=
2321 get_parent()->get_shard_missing().find(peer
);
2322 assert(j
!= get_parent()->get_shard_missing().end());
2323 if (j
->second
.is_missing(soid
)) {
2324 shards
.push_back(j
);
2328 // If more than 1 read will occur ignore possible request to not cache
2329 bool cache
= shards
.size() == 1 ? h
->cache_dont_need
: false;
2331 for (auto j
: shards
) {
2332 pg_shard_t peer
= j
->first
;
2333 h
->pushes
[peer
].push_back(PushOp());
2334 int r
= prep_push_to_replica(obc
, soid
, peer
,
2335 &(h
->pushes
[peer
].back()), cache
);
2337 // Back out all failed reads
2338 for (auto k
: shards
) {
2339 pg_shard_t p
= k
->first
;
2340 dout(10) << __func__
<< " clean up peer " << p
<< dendl
;
2341 h
->pushes
[p
].pop_back();
2342 if (p
== peer
) break;
2347 return shards
.size();