1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include "ECBackend.h"
19 #include "messages/MOSDPGPush.h"
20 #include "messages/MOSDPGPushReply.h"
21 #include "messages/MOSDECSubOpWrite.h"
22 #include "messages/MOSDECSubOpWriteReply.h"
23 #include "messages/MOSDECSubOpRead.h"
24 #include "messages/MOSDECSubOpReadReply.h"
25 #include "ECMsgTypes.h"
27 #include "PrimaryLogPG.h"
29 #define dout_context cct
30 #define dout_subsys ceph_subsys_osd
31 #define DOUT_PREFIX_ARGS this
33 #define dout_prefix _prefix(_dout, this)
34 static ostream
& _prefix(std::ostream
*_dout
, ECBackend
*pgb
) {
35 return *_dout
<< pgb
->get_parent()->gen_dbg_prefix();
38 struct ECRecoveryHandle
: public PGBackend::RecoveryHandle
{
39 list
<ECBackend::RecoveryOp
> ops
;
42 ostream
&operator<<(ostream
&lhs
, const ECBackend::pipeline_state_t
&rhs
) {
43 switch (rhs
.pipeline_state
) {
44 case ECBackend::pipeline_state_t::CACHE_VALID
:
45 return lhs
<< "CACHE_VALID";
46 case ECBackend::pipeline_state_t::CACHE_INVALID
:
47 return lhs
<< "CACHE_INVALID";
49 assert(0 == "invalid pipeline state");
51 return lhs
; // unreachable
54 static ostream
&operator<<(ostream
&lhs
, const map
<pg_shard_t
, bufferlist
> &rhs
)
57 for (map
<pg_shard_t
, bufferlist
>::const_iterator i
= rhs
.begin();
62 lhs
<< make_pair(i
->first
, i
->second
.length());
67 static ostream
&operator<<(ostream
&lhs
, const map
<int, bufferlist
> &rhs
)
70 for (map
<int, bufferlist
>::const_iterator i
= rhs
.begin();
75 lhs
<< make_pair(i
->first
, i
->second
.length());
80 static ostream
&operator<<(
82 const boost::tuple
<uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > &rhs
)
84 return lhs
<< "(" << rhs
.get
<0>() << ", "
85 << rhs
.get
<1>() << ", " << rhs
.get
<2>() << ")";
88 ostream
&operator<<(ostream
&lhs
, const ECBackend::read_request_t
&rhs
)
90 return lhs
<< "read_request_t(to_read=[" << rhs
.to_read
<< "]"
91 << ", need=" << rhs
.need
92 << ", want_attrs=" << rhs
.want_attrs
96 ostream
&operator<<(ostream
&lhs
, const ECBackend::read_result_t
&rhs
)
98 lhs
<< "read_result_t(r=" << rhs
.r
99 << ", errors=" << rhs
.errors
;
101 lhs
<< ", attrs=" << rhs
.attrs
.get();
105 return lhs
<< ", returned=" << rhs
.returned
<< ")";
108 ostream
&operator<<(ostream
&lhs
, const ECBackend::ReadOp
&rhs
)
110 lhs
<< "ReadOp(tid=" << rhs
.tid
;
111 if (rhs
.op
&& rhs
.op
->get_req()) {
113 rhs
.op
->get_req()->print(lhs
);
115 return lhs
<< ", to_read=" << rhs
.to_read
116 << ", complete=" << rhs
.complete
117 << ", priority=" << rhs
.priority
118 << ", obj_to_source=" << rhs
.obj_to_source
119 << ", source_to_obj=" << rhs
.source_to_obj
120 << ", in_progress=" << rhs
.in_progress
<< ")";
123 void ECBackend::ReadOp::dump(Formatter
*f
) const
125 f
->dump_unsigned("tid", tid
);
126 if (op
&& op
->get_req()) {
127 f
->dump_stream("op") << *(op
->get_req());
129 f
->dump_stream("to_read") << to_read
;
130 f
->dump_stream("complete") << complete
;
131 f
->dump_int("priority", priority
);
132 f
->dump_stream("obj_to_source") << obj_to_source
;
133 f
->dump_stream("source_to_obj") << source_to_obj
;
134 f
->dump_stream("in_progress") << in_progress
;
137 ostream
&operator<<(ostream
&lhs
, const ECBackend::Op
&rhs
)
139 lhs
<< "Op(" << rhs
.hoid
140 << " v=" << rhs
.version
141 << " tt=" << rhs
.trim_to
142 << " tid=" << rhs
.tid
143 << " reqid=" << rhs
.reqid
;
144 if (rhs
.client_op
&& rhs
.client_op
->get_req()) {
145 lhs
<< " client_op=";
146 rhs
.client_op
->get_req()->print(lhs
);
148 lhs
<< " roll_forward_to=" << rhs
.roll_forward_to
149 << " temp_added=" << rhs
.temp_added
150 << " temp_cleared=" << rhs
.temp_cleared
151 << " pending_read=" << rhs
.pending_read
152 << " remote_read=" << rhs
.remote_read
153 << " remote_read_result=" << rhs
.remote_read_result
154 << " pending_apply=" << rhs
.pending_apply
155 << " pending_commit=" << rhs
.pending_commit
156 << " plan.to_read=" << rhs
.plan
.to_read
157 << " plan.will_write=" << rhs
.plan
.will_write
162 ostream
&operator<<(ostream
&lhs
, const ECBackend::RecoveryOp
&rhs
)
164 return lhs
<< "RecoveryOp("
165 << "hoid=" << rhs
.hoid
167 << " missing_on=" << rhs
.missing_on
168 << " missing_on_shards=" << rhs
.missing_on_shards
169 << " recovery_info=" << rhs
.recovery_info
170 << " recovery_progress=" << rhs
.recovery_progress
171 << " obc refcount=" << rhs
.obc
.use_count()
172 << " state=" << ECBackend::RecoveryOp::tostr(rhs
.state
)
173 << " waiting_on_pushes=" << rhs
.waiting_on_pushes
174 << " extent_requested=" << rhs
.extent_requested
178 void ECBackend::RecoveryOp::dump(Formatter
*f
) const
180 f
->dump_stream("hoid") << hoid
;
181 f
->dump_stream("v") << v
;
182 f
->dump_stream("missing_on") << missing_on
;
183 f
->dump_stream("missing_on_shards") << missing_on_shards
;
184 f
->dump_stream("recovery_info") << recovery_info
;
185 f
->dump_stream("recovery_progress") << recovery_progress
;
186 f
->dump_stream("state") << tostr(state
);
187 f
->dump_stream("waiting_on_pushes") << waiting_on_pushes
;
188 f
->dump_stream("extent_requested") << extent_requested
;
191 ECBackend::ECBackend(
192 PGBackend::Listener
*pg
,
194 ObjectStore::CollectionHandle
&ch
,
197 ErasureCodeInterfaceRef ec_impl
,
198 uint64_t stripe_width
)
199 : PGBackend(cct
, pg
, store
, coll
, ch
),
201 sinfo(ec_impl
->get_data_chunk_count(), stripe_width
) {
202 assert((ec_impl
->get_data_chunk_count() *
203 ec_impl
->get_chunk_size(stripe_width
)) == stripe_width
);
206 PGBackend::RecoveryHandle
*ECBackend::open_recovery_op()
208 return new ECRecoveryHandle
;
211 void ECBackend::_failed_push(const hobject_t
&hoid
,
212 pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
)
214 ECBackend::read_result_t
&res
= in
.second
;
215 dout(10) << __func__
<< ": Read error " << hoid
<< " r="
216 << res
.r
<< " errors=" << res
.errors
<< dendl
;
217 dout(10) << __func__
<< ": canceling recovery op for obj " << hoid
219 assert(recovery_ops
.count(hoid
));
220 eversion_t v
= recovery_ops
[hoid
].v
;
221 recovery_ops
.erase(hoid
);
224 for (auto&& i
: res
.errors
) {
225 fl
.push_back(i
.first
);
227 get_parent()->failed_push(fl
, hoid
);
228 get_parent()->backfill_add_missing(hoid
, v
);
229 get_parent()->finish_degraded_object(hoid
);
232 struct OnRecoveryReadComplete
:
233 public GenContext
<pair
<RecoveryMessages
*, ECBackend::read_result_t
& > &> {
237 OnRecoveryReadComplete(ECBackend
*pg
, const hobject_t
&hoid
)
238 : pg(pg
), hoid(hoid
) {}
239 void finish(pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
) override
{
240 ECBackend::read_result_t
&res
= in
.second
;
241 if (!(res
.r
== 0 && res
.errors
.empty())) {
242 pg
->_failed_push(hoid
, in
);
245 assert(res
.returned
.size() == 1);
246 pg
->handle_recovery_read_complete(
254 struct RecoveryMessages
{
256 ECBackend::read_request_t
> reads
;
259 const hobject_t
&hoid
, uint64_t off
, uint64_t len
,
260 const set
<pg_shard_t
> &need
,
262 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > to_read
;
263 to_read
.push_back(boost::make_tuple(off
, len
, 0));
264 assert(!reads
.count(hoid
));
268 ECBackend::read_request_t(
272 new OnRecoveryReadComplete(
277 map
<pg_shard_t
, vector
<PushOp
> > pushes
;
278 map
<pg_shard_t
, vector
<PushReplyOp
> > push_replies
;
279 ObjectStore::Transaction t
;
280 RecoveryMessages() {}
281 ~RecoveryMessages(){}
284 void ECBackend::handle_recovery_push(
289 if (get_parent()->check_failsafe_full(ss
)) {
290 dout(10) << __func__
<< " Out of space (failsafe) processing push request: " << ss
.str() << dendl
;
294 bool oneshot
= op
.before_progress
.first
&& op
.after_progress
.data_complete
;
297 tobj
= ghobject_t(op
.soid
, ghobject_t::NO_GEN
,
298 get_parent()->whoami_shard().shard
);
300 tobj
= ghobject_t(get_parent()->get_temp_recovery_object(op
.soid
,
303 get_parent()->whoami_shard().shard
);
304 if (op
.before_progress
.first
) {
305 dout(10) << __func__
<< ": Adding oid "
306 << tobj
.hobj
<< " in the temp collection" << dendl
;
307 add_temp_obj(tobj
.hobj
);
311 if (op
.before_progress
.first
) {
312 m
->t
.remove(coll
, tobj
);
313 m
->t
.touch(coll
, tobj
);
316 if (!op
.data_included
.empty()) {
317 uint64_t start
= op
.data_included
.range_start();
318 uint64_t end
= op
.data_included
.range_end();
319 assert(op
.data
.length() == (end
- start
));
328 assert(op
.data
.length() == 0);
331 if (op
.before_progress
.first
) {
332 assert(op
.attrset
.count(string("_")));
339 if (op
.after_progress
.data_complete
&& !oneshot
) {
340 dout(10) << __func__
<< ": Removing oid "
341 << tobj
.hobj
<< " from the temp collection" << dendl
;
342 clear_temp_obj(tobj
.hobj
);
343 m
->t
.remove(coll
, ghobject_t(
344 op
.soid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
345 m
->t
.collection_move_rename(
348 op
.soid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
350 if (op
.after_progress
.data_complete
) {
351 if ((get_parent()->pgb_is_primary())) {
352 assert(recovery_ops
.count(op
.soid
));
353 assert(recovery_ops
[op
.soid
].obc
);
354 get_parent()->on_local_recover(
357 recovery_ops
[op
.soid
].obc
,
361 get_parent()->on_local_recover(
369 m
->push_replies
[get_parent()->primary_shard()].push_back(PushReplyOp());
370 m
->push_replies
[get_parent()->primary_shard()].back().soid
= op
.soid
;
373 void ECBackend::handle_recovery_push_reply(
374 const PushReplyOp
&op
,
378 if (!recovery_ops
.count(op
.soid
))
380 RecoveryOp
&rop
= recovery_ops
[op
.soid
];
381 assert(rop
.waiting_on_pushes
.count(from
));
382 rop
.waiting_on_pushes
.erase(from
);
383 continue_recovery_op(rop
, m
);
386 void ECBackend::handle_recovery_read_complete(
387 const hobject_t
&hoid
,
388 boost::tuple
<uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > &to_read
,
389 boost::optional
<map
<string
, bufferlist
> > attrs
,
392 dout(10) << __func__
<< ": returned " << hoid
<< " "
393 << "(" << to_read
.get
<0>()
394 << ", " << to_read
.get
<1>()
395 << ", " << to_read
.get
<2>()
398 assert(recovery_ops
.count(hoid
));
399 RecoveryOp
&op
= recovery_ops
[hoid
];
400 assert(op
.returned_data
.empty());
401 map
<int, bufferlist
*> target
;
402 for (set
<shard_id_t
>::iterator i
= op
.missing_on_shards
.begin();
403 i
!= op
.missing_on_shards
.end();
405 target
[*i
] = &(op
.returned_data
[*i
]);
407 map
<int, bufferlist
> from
;
408 for(map
<pg_shard_t
, bufferlist
>::iterator i
= to_read
.get
<2>().begin();
409 i
!= to_read
.get
<2>().end();
411 from
[i
->first
.shard
].claim(i
->second
);
413 dout(10) << __func__
<< ": " << from
<< dendl
;
414 int r
= ECUtil::decode(sinfo
, ec_impl
, from
, target
);
417 op
.xattrs
.swap(*attrs
);
420 // attrs only reference the origin bufferlist (decode from
421 // ECSubReadReply message) whose size is much greater than attrs
422 // in recovery. If obc cache it (get_obc maybe cache the attr),
423 // this causes the whole origin bufferlist would not be free
424 // until obc is evicted from obc cache. So rebuild the
425 // bufferlist before cache it.
426 for (map
<string
, bufferlist
>::iterator it
= op
.xattrs
.begin();
427 it
!= op
.xattrs
.end();
429 it
->second
.rebuild();
431 // Need to remove ECUtil::get_hinfo_key() since it should not leak out
432 // of the backend (see bug #12983)
433 map
<string
, bufferlist
> sanitized_attrs(op
.xattrs
);
434 sanitized_attrs
.erase(ECUtil::get_hinfo_key());
435 op
.obc
= get_parent()->get_obc(hoid
, sanitized_attrs
);
437 op
.recovery_info
.size
= op
.obc
->obs
.oi
.size
;
438 op
.recovery_info
.oi
= op
.obc
->obs
.oi
;
441 ECUtil::HashInfo
hinfo(ec_impl
->get_chunk_count());
442 if (op
.obc
->obs
.oi
.size
> 0) {
443 assert(op
.xattrs
.count(ECUtil::get_hinfo_key()));
444 bufferlist::iterator bp
= op
.xattrs
[ECUtil::get_hinfo_key()].begin();
447 op
.hinfo
= unstable_hashinfo_registry
.lookup_or_create(hoid
, hinfo
);
449 assert(op
.xattrs
.size());
451 continue_recovery_op(op
, m
);
454 struct SendPushReplies
: public Context
{
455 PGBackend::Listener
*l
;
457 map
<int, MOSDPGPushReply
*> replies
;
459 PGBackend::Listener
*l
,
461 map
<int, MOSDPGPushReply
*> &in
) : l(l
), epoch(epoch
) {
464 void finish(int) override
{
465 for (map
<int, MOSDPGPushReply
*>::iterator i
= replies
.begin();
468 l
->send_message_osd_cluster(i
->first
, i
->second
, epoch
);
472 ~SendPushReplies() override
{
473 for (map
<int, MOSDPGPushReply
*>::iterator i
= replies
.begin();
482 void ECBackend::dispatch_recovery_messages(RecoveryMessages
&m
, int priority
)
484 for (map
<pg_shard_t
, vector
<PushOp
> >::iterator i
= m
.pushes
.begin();
486 m
.pushes
.erase(i
++)) {
487 MOSDPGPush
*msg
= new MOSDPGPush();
488 msg
->set_priority(priority
);
489 msg
->map_epoch
= get_parent()->get_epoch();
490 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
491 msg
->from
= get_parent()->whoami_shard();
492 msg
->pgid
= spg_t(get_parent()->get_info().pgid
.pgid
, i
->first
.shard
);
493 msg
->pushes
.swap(i
->second
);
494 msg
->compute_cost(cct
);
495 get_parent()->send_message(
499 map
<int, MOSDPGPushReply
*> replies
;
500 for (map
<pg_shard_t
, vector
<PushReplyOp
> >::iterator i
=
501 m
.push_replies
.begin();
502 i
!= m
.push_replies
.end();
503 m
.push_replies
.erase(i
++)) {
504 MOSDPGPushReply
*msg
= new MOSDPGPushReply();
505 msg
->set_priority(priority
);
506 msg
->map_epoch
= get_parent()->get_epoch();
507 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
508 msg
->from
= get_parent()->whoami_shard();
509 msg
->pgid
= spg_t(get_parent()->get_info().pgid
.pgid
, i
->first
.shard
);
510 msg
->replies
.swap(i
->second
);
511 msg
->compute_cost(cct
);
512 replies
.insert(make_pair(i
->first
.osd
, msg
));
515 if (!replies
.empty()) {
516 (m
.t
).register_on_complete(
517 get_parent()->bless_context(
520 get_parent()->get_epoch(),
522 get_parent()->queue_transaction(std::move(m
.t
));
534 void ECBackend::continue_recovery_op(
538 dout(10) << __func__
<< ": continuing " << op
<< dendl
;
541 case RecoveryOp::IDLE
: {
543 op
.state
= RecoveryOp::READING
;
544 assert(!op
.recovery_progress
.data_complete
);
545 set
<int> want(op
.missing_on_shards
.begin(), op
.missing_on_shards
.end());
546 uint64_t from
= op
.recovery_progress
.data_recovered_to
;
547 uint64_t amount
= get_recovery_chunk_size();
549 if (op
.recovery_progress
.first
&& op
.obc
) {
550 /* We've got the attrs and the hinfo, might as well use them */
551 op
.hinfo
= get_hash_info(op
.hoid
);
553 op
.xattrs
= op
.obc
->attr_cache
;
554 ::encode(*(op
.hinfo
), op
.xattrs
[ECUtil::get_hinfo_key()]);
557 set
<pg_shard_t
> to_read
;
558 int r
= get_min_avail_to_read_shards(
559 op
.hoid
, want
, true, false, &to_read
);
561 // we must have lost a recovery source
562 assert(!op
.recovery_progress
.first
);
563 dout(10) << __func__
<< ": canceling recovery op for obj " << op
.hoid
565 get_parent()->cancel_pull(op
.hoid
);
566 recovery_ops
.erase(op
.hoid
);
572 op
.recovery_progress
.data_recovered_to
,
575 op
.recovery_progress
.first
&& !op
.obc
);
576 op
.extent_requested
= make_pair(
579 dout(10) << __func__
<< ": IDLE return " << op
<< dendl
;
582 case RecoveryOp::READING
: {
583 // read completed, start write
584 assert(op
.xattrs
.size());
585 assert(op
.returned_data
.size());
586 op
.state
= RecoveryOp::WRITING
;
587 ObjectRecoveryProgress after_progress
= op
.recovery_progress
;
588 after_progress
.data_recovered_to
+= op
.extent_requested
.second
;
589 after_progress
.first
= false;
590 if (after_progress
.data_recovered_to
>= op
.obc
->obs
.oi
.size
) {
591 after_progress
.data_recovered_to
=
592 sinfo
.logical_to_next_stripe_offset(
593 op
.obc
->obs
.oi
.size
);
594 after_progress
.data_complete
= true;
596 for (set
<pg_shard_t
>::iterator mi
= op
.missing_on
.begin();
597 mi
!= op
.missing_on
.end();
599 assert(op
.returned_data
.count(mi
->shard
));
600 m
->pushes
[*mi
].push_back(PushOp());
601 PushOp
&pop
= m
->pushes
[*mi
].back();
604 pop
.data
= op
.returned_data
[mi
->shard
];
605 dout(10) << __func__
<< ": before_progress=" << op
.recovery_progress
606 << ", after_progress=" << after_progress
607 << ", pop.data.length()=" << pop
.data
.length()
608 << ", size=" << op
.obc
->obs
.oi
.size
<< dendl
;
611 sinfo
.aligned_logical_offset_to_chunk_offset(
612 after_progress
.data_recovered_to
-
613 op
.recovery_progress
.data_recovered_to
)
615 if (pop
.data
.length())
616 pop
.data_included
.insert(
617 sinfo
.aligned_logical_offset_to_chunk_offset(
618 op
.recovery_progress
.data_recovered_to
),
621 if (op
.recovery_progress
.first
) {
622 pop
.attrset
= op
.xattrs
;
624 pop
.recovery_info
= op
.recovery_info
;
625 pop
.before_progress
= op
.recovery_progress
;
626 pop
.after_progress
= after_progress
;
627 if (*mi
!= get_parent()->primary_shard())
628 get_parent()->begin_peer_recover(
632 op
.returned_data
.clear();
633 op
.waiting_on_pushes
= op
.missing_on
;
634 op
.recovery_progress
= after_progress
;
635 dout(10) << __func__
<< ": READING return " << op
<< dendl
;
638 case RecoveryOp::WRITING
: {
639 if (op
.waiting_on_pushes
.empty()) {
640 if (op
.recovery_progress
.data_complete
) {
641 op
.state
= RecoveryOp::COMPLETE
;
642 for (set
<pg_shard_t
>::iterator i
= op
.missing_on
.begin();
643 i
!= op
.missing_on
.end();
645 if (*i
!= get_parent()->primary_shard()) {
646 dout(10) << __func__
<< ": on_peer_recover on " << *i
647 << ", obj " << op
.hoid
<< dendl
;
648 get_parent()->on_peer_recover(
654 object_stat_sum_t stat
;
655 stat
.num_bytes_recovered
= op
.recovery_info
.size
;
656 stat
.num_keys_recovered
= 0; // ??? op ... omap_entries.size(); ?
657 stat
.num_objects_recovered
= 1;
658 get_parent()->on_global_recover(op
.hoid
, stat
, false);
659 dout(10) << __func__
<< ": WRITING return " << op
<< dendl
;
660 recovery_ops
.erase(op
.hoid
);
663 op
.state
= RecoveryOp::IDLE
;
664 dout(10) << __func__
<< ": WRITING continue " << op
<< dendl
;
670 // should never be called once complete
671 case RecoveryOp::COMPLETE
:
679 void ECBackend::run_recovery_op(
683 ECRecoveryHandle
*h
= static_cast<ECRecoveryHandle
*>(_h
);
685 for (list
<RecoveryOp
>::iterator i
= h
->ops
.begin();
688 dout(10) << __func__
<< ": starting " << *i
<< dendl
;
689 assert(!recovery_ops
.count(i
->hoid
));
690 RecoveryOp
&op
= recovery_ops
.insert(make_pair(i
->hoid
, *i
)).first
->second
;
691 continue_recovery_op(op
, &m
);
694 dispatch_recovery_messages(m
, priority
);
695 send_recovery_deletes(priority
, h
->deletes
);
699 int ECBackend::recover_object(
700 const hobject_t
&hoid
,
702 ObjectContextRef head
,
703 ObjectContextRef obc
,
706 ECRecoveryHandle
*h
= static_cast<ECRecoveryHandle
*>(_h
);
707 h
->ops
.push_back(RecoveryOp());
709 h
->ops
.back().hoid
= hoid
;
710 h
->ops
.back().obc
= obc
;
711 h
->ops
.back().recovery_info
.soid
= hoid
;
712 h
->ops
.back().recovery_info
.version
= v
;
714 h
->ops
.back().recovery_info
.size
= obc
->obs
.oi
.size
;
715 h
->ops
.back().recovery_info
.oi
= obc
->obs
.oi
;
717 if (hoid
.is_snap()) {
720 h
->ops
.back().recovery_info
.ss
= obc
->ssc
->snapset
;
723 h
->ops
.back().recovery_info
.ss
= head
->ssc
->snapset
;
725 assert(0 == "neither obc nor head set for a snap object");
728 h
->ops
.back().recovery_progress
.omap_complete
= true;
729 for (set
<pg_shard_t
>::const_iterator i
=
730 get_parent()->get_actingbackfill_shards().begin();
731 i
!= get_parent()->get_actingbackfill_shards().end();
733 dout(10) << "checking " << *i
<< dendl
;
734 if (get_parent()->get_shard_missing(*i
).is_missing(hoid
)) {
735 h
->ops
.back().missing_on
.insert(*i
);
736 h
->ops
.back().missing_on_shards
.insert(i
->shard
);
739 dout(10) << __func__
<< ": built op " << h
->ops
.back() << dendl
;
743 bool ECBackend::can_handle_while_inactive(
749 bool ECBackend::_handle_message(
752 dout(10) << __func__
<< ": " << *_op
->get_req() << dendl
;
753 int priority
= _op
->get_req()->get_priority();
754 switch (_op
->get_req()->get_type()) {
755 case MSG_OSD_EC_WRITE
: {
756 // NOTE: this is non-const because handle_sub_write modifies the embedded
757 // ObjectStore::Transaction in place (and then std::move's it). It does
758 // not conflict with ECSubWrite's operator<<.
759 MOSDECSubOpWrite
*op
= static_cast<MOSDECSubOpWrite
*>(
760 _op
->get_nonconst_req());
761 handle_sub_write(op
->op
.from
, _op
, op
->op
, _op
->pg_trace
);
764 case MSG_OSD_EC_WRITE_REPLY
: {
765 const MOSDECSubOpWriteReply
*op
= static_cast<const MOSDECSubOpWriteReply
*>(
767 handle_sub_write_reply(op
->op
.from
, op
->op
, _op
->pg_trace
);
770 case MSG_OSD_EC_READ
: {
771 const MOSDECSubOpRead
*op
= static_cast<const MOSDECSubOpRead
*>(_op
->get_req());
772 MOSDECSubOpReadReply
*reply
= new MOSDECSubOpReadReply
;
773 reply
->pgid
= get_parent()->primary_spg_t();
774 reply
->map_epoch
= get_parent()->get_epoch();
775 reply
->min_epoch
= get_parent()->get_interval_start_epoch();
776 handle_sub_read(op
->op
.from
, op
->op
, &(reply
->op
), _op
->pg_trace
);
777 reply
->trace
= _op
->pg_trace
;
778 get_parent()->send_message_osd_cluster(
779 op
->op
.from
.osd
, reply
, get_parent()->get_epoch());
782 case MSG_OSD_EC_READ_REPLY
: {
783 // NOTE: this is non-const because handle_sub_read_reply steals resulting
784 // buffers. It does not conflict with ECSubReadReply operator<<.
785 MOSDECSubOpReadReply
*op
= static_cast<MOSDECSubOpReadReply
*>(
786 _op
->get_nonconst_req());
788 handle_sub_read_reply(op
->op
.from
, op
->op
, &rm
, _op
->pg_trace
);
789 dispatch_recovery_messages(rm
, priority
);
792 case MSG_OSD_PG_PUSH
: {
793 const MOSDPGPush
*op
= static_cast<const MOSDPGPush
*>(_op
->get_req());
795 for (vector
<PushOp
>::const_iterator i
= op
->pushes
.begin();
796 i
!= op
->pushes
.end();
798 handle_recovery_push(*i
, &rm
);
800 dispatch_recovery_messages(rm
, priority
);
803 case MSG_OSD_PG_PUSH_REPLY
: {
804 const MOSDPGPushReply
*op
= static_cast<const MOSDPGPushReply
*>(
807 for (vector
<PushReplyOp
>::const_iterator i
= op
->replies
.begin();
808 i
!= op
->replies
.end();
810 handle_recovery_push_reply(*i
, op
->from
, &rm
);
812 dispatch_recovery_messages(rm
, priority
);
821 struct SubWriteCommitted
: public Context
{
826 eversion_t last_complete
;
827 const ZTracer::Trace trace
;
833 eversion_t last_complete
,
834 const ZTracer::Trace
&trace
)
835 : pg(pg
), msg(msg
), tid(tid
),
836 version(version
), last_complete(last_complete
), trace(trace
) {}
837 void finish(int) override
{
839 msg
->mark_event("sub_op_committed");
840 pg
->sub_write_committed(tid
, version
, last_complete
, trace
);
843 void ECBackend::sub_write_committed(
844 ceph_tid_t tid
, eversion_t version
, eversion_t last_complete
,
845 const ZTracer::Trace
&trace
) {
846 if (get_parent()->pgb_is_primary()) {
847 ECSubWriteReply reply
;
849 reply
.last_complete
= last_complete
;
850 reply
.committed
= true;
851 reply
.from
= get_parent()->whoami_shard();
852 handle_sub_write_reply(
853 get_parent()->whoami_shard(),
856 get_parent()->update_last_complete_ondisk(last_complete
);
857 MOSDECSubOpWriteReply
*r
= new MOSDECSubOpWriteReply
;
858 r
->pgid
= get_parent()->primary_spg_t();
859 r
->map_epoch
= get_parent()->get_epoch();
860 r
->min_epoch
= get_parent()->get_interval_start_epoch();
862 r
->op
.last_complete
= last_complete
;
863 r
->op
.committed
= true;
864 r
->op
.from
= get_parent()->whoami_shard();
865 r
->set_priority(CEPH_MSG_PRIO_HIGH
);
867 r
->trace
.event("sending sub op commit");
868 get_parent()->send_message_osd_cluster(
869 get_parent()->primary_shard().osd
, r
, get_parent()->get_epoch());
873 struct SubWriteApplied
: public Context
{
878 const ZTracer::Trace trace
;
884 const ZTracer::Trace
&trace
)
885 : pg(pg
), msg(msg
), tid(tid
), version(version
), trace(trace
) {}
886 void finish(int) override
{
888 msg
->mark_event("sub_op_applied");
889 pg
->sub_write_applied(tid
, version
, trace
);
892 void ECBackend::sub_write_applied(
893 ceph_tid_t tid
, eversion_t version
,
894 const ZTracer::Trace
&trace
) {
895 parent
->op_applied(version
);
896 if (get_parent()->pgb_is_primary()) {
897 ECSubWriteReply reply
;
898 reply
.from
= get_parent()->whoami_shard();
900 reply
.applied
= true;
901 handle_sub_write_reply(
902 get_parent()->whoami_shard(),
905 MOSDECSubOpWriteReply
*r
= new MOSDECSubOpWriteReply
;
906 r
->pgid
= get_parent()->primary_spg_t();
907 r
->map_epoch
= get_parent()->get_epoch();
908 r
->min_epoch
= get_parent()->get_interval_start_epoch();
909 r
->op
.from
= get_parent()->whoami_shard();
911 r
->op
.applied
= true;
912 r
->set_priority(CEPH_MSG_PRIO_HIGH
);
914 r
->trace
.event("sending sub op apply");
915 get_parent()->send_message_osd_cluster(
916 get_parent()->primary_shard().osd
, r
, get_parent()->get_epoch());
920 void ECBackend::handle_sub_write(
924 const ZTracer::Trace
&trace
,
925 Context
*on_local_applied_sync
)
929 trace
.event("handle_sub_write");
930 assert(!get_parent()->get_log().get_missing().is_missing(op
.soid
));
931 if (!get_parent()->pgb_is_primary())
932 get_parent()->update_stats(op
.stats
);
933 ObjectStore::Transaction localt
;
934 if (!op
.temp_added
.empty()) {
935 add_temp_objs(op
.temp_added
);
938 for (set
<hobject_t
>::iterator i
= op
.temp_removed
.begin();
939 i
!= op
.temp_removed
.end();
941 dout(10) << __func__
<< ": removing object " << *i
942 << " since we won't get the transaction" << dendl
;
948 get_parent()->whoami_shard().shard
));
951 clear_temp_objs(op
.temp_removed
);
952 get_parent()->log_operation(
954 op
.updated_hit_set_history
,
960 PrimaryLogPG
*_rPG
= dynamic_cast<PrimaryLogPG
*>(get_parent());
961 if (_rPG
&& !_rPG
->is_undersized() &&
962 (unsigned)get_parent()->whoami_shard().shard
>= ec_impl
->get_data_chunk_count())
963 op
.t
.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
965 if (on_local_applied_sync
) {
966 dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync
<< dendl
;
967 localt
.register_on_applied_sync(on_local_applied_sync
);
969 localt
.register_on_commit(
970 get_parent()->bless_context(
971 new SubWriteCommitted(
974 get_parent()->get_info().last_complete
, trace
)));
975 localt
.register_on_applied(
976 get_parent()->bless_context(
977 new SubWriteApplied(this, msg
, op
.tid
, op
.at_version
, trace
)));
978 vector
<ObjectStore::Transaction
> tls
;
980 tls
.push_back(std::move(op
.t
));
981 tls
.push_back(std::move(localt
));
982 get_parent()->queue_transactions(tls
, msg
);
985 void ECBackend::handle_sub_read(
988 ECSubReadReply
*reply
,
989 const ZTracer::Trace
&trace
)
991 trace
.event("handle sub read");
992 shard_id_t shard
= get_parent()->whoami_shard().shard
;
993 for(auto i
= op
.to_read
.begin();
994 i
!= op
.to_read
.end();
997 ECUtil::HashInfoRef hinfo
;
998 if (!get_parent()->get_pool().allows_ecoverwrites()) {
999 hinfo
= get_hash_info(i
->first
);
1002 get_parent()->clog_error() << "Corruption detected: object " << i
->first
1003 << " is missing hash_info";
1004 dout(5) << __func__
<< ": No hinfo for " << i
->first
<< dendl
;
1008 for (auto j
= i
->second
.begin(); j
!= i
->second
.end(); ++j
) {
1012 ghobject_t(i
->first
, ghobject_t::NO_GEN
, shard
),
1017 get_parent()->clog_error() << "Error " << r
1018 << " reading object "
1020 dout(5) << __func__
<< ": Error " << r
1021 << " reading " << i
->first
<< dendl
;
1024 dout(20) << __func__
<< " read request=" << j
->get
<1>() << " r=" << r
<< " len=" << bl
.length() << dendl
;
1025 reply
->buffers_read
[i
->first
].push_back(
1032 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1033 // This shows that we still need deep scrub because large enough files
1034 // are read in sections, so the digest check here won't be done here.
1035 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1036 // the state of our chunk in case other chunks could substitute.
1037 assert(hinfo
->has_chunk_hash());
1038 if ((bl
.length() == hinfo
->get_total_chunk_size()) &&
1039 (j
->get
<0>() == 0)) {
1040 dout(20) << __func__
<< ": Checking hash of " << i
->first
<< dendl
;
1043 if (h
.digest() != hinfo
->get_chunk_hash(shard
)) {
1044 get_parent()->clog_error() << "Bad hash for " << i
->first
<< " digest 0x"
1045 << hex
<< h
.digest() << " expected 0x" << hinfo
->get_chunk_hash(shard
) << dec
;
1046 dout(5) << __func__
<< ": Bad hash for " << i
->first
<< " digest 0x"
1047 << hex
<< h
.digest() << " expected 0x" << hinfo
->get_chunk_hash(shard
) << dec
<< dendl
;
1056 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1057 // the state of our chunk in case other chunks could substitute.
1058 reply
->buffers_read
.erase(i
->first
);
1059 reply
->errors
[i
->first
] = r
;
1061 for (set
<hobject_t
>::iterator i
= op
.attrs_to_read
.begin();
1062 i
!= op
.attrs_to_read
.end();
1064 dout(10) << __func__
<< ": fulfilling attr request on "
1066 if (reply
->errors
.count(*i
))
1068 int r
= store
->getattrs(
1071 *i
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
1072 reply
->attrs_read
[*i
]);
1074 reply
->buffers_read
.erase(*i
);
1075 reply
->errors
[*i
] = r
;
1078 reply
->from
= get_parent()->whoami_shard();
1079 reply
->tid
= op
.tid
;
1082 void ECBackend::handle_sub_write_reply(
1084 const ECSubWriteReply
&op
,
1085 const ZTracer::Trace
&trace
)
1087 map
<ceph_tid_t
, Op
>::iterator i
= tid_to_op_map
.find(op
.tid
);
1088 assert(i
!= tid_to_op_map
.end());
1090 trace
.event("sub write committed");
1091 assert(i
->second
.pending_commit
.count(from
));
1092 i
->second
.pending_commit
.erase(from
);
1093 if (from
!= get_parent()->whoami_shard()) {
1094 get_parent()->update_peer_last_complete_ondisk(from
, op
.last_complete
);
1098 trace
.event("sub write applied");
1099 assert(i
->second
.pending_apply
.count(from
));
1100 i
->second
.pending_apply
.erase(from
);
1103 if (i
->second
.pending_apply
.empty() && i
->second
.on_all_applied
) {
1104 dout(10) << __func__
<< " Calling on_all_applied on " << i
->second
<< dendl
;
1105 i
->second
.on_all_applied
->complete(0);
1106 i
->second
.on_all_applied
= 0;
1107 i
->second
.trace
.event("ec write all applied");
1109 if (i
->second
.pending_commit
.empty() && i
->second
.on_all_commit
) {
1110 dout(10) << __func__
<< " Calling on_all_commit on " << i
->second
<< dendl
;
1111 i
->second
.on_all_commit
->complete(0);
1112 i
->second
.on_all_commit
= 0;
1113 i
->second
.trace
.event("ec write all committed");
1118 void ECBackend::handle_sub_read_reply(
1121 RecoveryMessages
*m
,
1122 const ZTracer::Trace
&trace
)
1124 trace
.event("ec sub read reply");
1125 dout(10) << __func__
<< ": reply " << op
<< dendl
;
1126 map
<ceph_tid_t
, ReadOp
>::iterator iter
= tid_to_read_map
.find(op
.tid
);
1127 if (iter
== tid_to_read_map
.end()) {
1129 dout(20) << __func__
<< ": dropped " << op
<< dendl
;
1132 ReadOp
&rop
= iter
->second
;
1133 for (auto i
= op
.buffers_read
.begin();
1134 i
!= op
.buffers_read
.end();
1136 assert(!op
.errors
.count(i
->first
)); // If attribute error we better not have sent a buffer
1137 if (!rop
.to_read
.count(i
->first
)) {
1138 // We canceled this read! @see filter_read_op
1139 dout(20) << __func__
<< " to_read skipping" << dendl
;
1142 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter
=
1143 rop
.to_read
.find(i
->first
)->second
.to_read
.begin();
1146 uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > >::iterator riter
=
1147 rop
.complete
[i
->first
].returned
.begin();
1148 for (list
<pair
<uint64_t, bufferlist
> >::iterator j
= i
->second
.begin();
1149 j
!= i
->second
.end();
1150 ++j
, ++req_iter
, ++riter
) {
1151 assert(req_iter
!= rop
.to_read
.find(i
->first
)->second
.to_read
.end());
1152 assert(riter
!= rop
.complete
[i
->first
].returned
.end());
1153 pair
<uint64_t, uint64_t> adjusted
=
1154 sinfo
.aligned_offset_len_to_chunk(
1155 make_pair(req_iter
->get
<0>(), req_iter
->get
<1>()));
1156 assert(adjusted
.first
== j
->first
);
1157 riter
->get
<2>()[from
].claim(j
->second
);
1160 for (auto i
= op
.attrs_read
.begin();
1161 i
!= op
.attrs_read
.end();
1163 assert(!op
.errors
.count(i
->first
)); // if read error better not have sent an attribute
1164 if (!rop
.to_read
.count(i
->first
)) {
1165 // We canceled this read! @see filter_read_op
1166 dout(20) << __func__
<< " to_read skipping" << dendl
;
1169 rop
.complete
[i
->first
].attrs
= map
<string
, bufferlist
>();
1170 (*(rop
.complete
[i
->first
].attrs
)).swap(i
->second
);
1172 for (auto i
= op
.errors
.begin();
1173 i
!= op
.errors
.end();
1175 rop
.complete
[i
->first
].errors
.insert(
1179 dout(20) << __func__
<< " shard=" << from
<< " error=" << i
->second
<< dendl
;
1182 map
<pg_shard_t
, set
<ceph_tid_t
> >::iterator siter
=
1183 shard_to_read_map
.find(from
);
1184 assert(siter
!= shard_to_read_map
.end());
1185 assert(siter
->second
.count(op
.tid
));
1186 siter
->second
.erase(op
.tid
);
1188 assert(rop
.in_progress
.count(from
));
1189 rop
.in_progress
.erase(from
);
1190 unsigned is_complete
= 0;
1191 // For redundant reads check for completion as each shard comes in,
1192 // or in a non-recovery read check for completion once all the shards read.
1193 if (rop
.do_redundant_reads
|| rop
.in_progress
.empty()) {
1194 for (map
<hobject_t
, read_result_t
>::const_iterator iter
=
1195 rop
.complete
.begin();
1196 iter
!= rop
.complete
.end();
1199 for (map
<pg_shard_t
, bufferlist
>::const_iterator j
=
1200 iter
->second
.returned
.front().get
<2>().begin();
1201 j
!= iter
->second
.returned
.front().get
<2>().end();
1203 have
.insert(j
->first
.shard
);
1204 dout(20) << __func__
<< " have shard=" << j
->first
.shard
<< dendl
;
1206 set
<int> want_to_read
, dummy_minimum
;
1207 get_want_to_read_shards(&want_to_read
);
1209 if ((err
= ec_impl
->minimum_to_decode(want_to_read
, have
, &dummy_minimum
)) < 0) {
1210 dout(20) << __func__
<< " minimum_to_decode failed" << dendl
;
1211 if (rop
.in_progress
.empty()) {
1212 // If we don't have enough copies and we haven't sent reads for all shards
1213 // we can send the rest of the reads, if any.
1214 if (!rop
.do_redundant_reads
) {
1215 int r
= send_all_remaining_reads(iter
->first
, rop
);
1217 // We added to in_progress and not incrementing is_complete
1220 // Couldn't read any additional shards so handle as completed with errors
1222 // We don't want to confuse clients / RBD with objectstore error
1223 // values in particular ENOENT. We may have different error returns
1224 // from different shards, so we'll return minimum_to_decode() error
1225 // (usually EIO) to reader. It is likely an error here is due to a
1227 rop
.complete
[iter
->first
].r
= err
;
1231 assert(rop
.complete
[iter
->first
].r
== 0);
1232 if (!rop
.complete
[iter
->first
].errors
.empty()) {
1233 if (cct
->_conf
->osd_read_ec_check_for_errors
) {
1234 dout(10) << __func__
<< ": Not ignoring errors, use one shard err=" << err
<< dendl
;
1235 err
= rop
.complete
[iter
->first
].errors
.begin()->second
;
1236 rop
.complete
[iter
->first
].r
= err
;
1238 get_parent()->clog_warn() << "Error(s) ignored for "
1239 << iter
->first
<< " enough copies available";
1240 dout(10) << __func__
<< " Error(s) ignored for " << iter
->first
1241 << " enough copies available" << dendl
;
1242 rop
.complete
[iter
->first
].errors
.clear();
1249 if (rop
.in_progress
.empty() || is_complete
== rop
.complete
.size()) {
1250 dout(20) << __func__
<< " Complete: " << rop
<< dendl
;
1251 rop
.trace
.event("ec read complete");
1252 complete_read_op(rop
, m
);
1254 dout(10) << __func__
<< " readop not complete: " << rop
<< dendl
;
1258 void ECBackend::complete_read_op(ReadOp
&rop
, RecoveryMessages
*m
)
1260 map
<hobject_t
, read_request_t
>::iterator reqiter
=
1261 rop
.to_read
.begin();
1262 map
<hobject_t
, read_result_t
>::iterator resiter
=
1263 rop
.complete
.begin();
1264 assert(rop
.to_read
.size() == rop
.complete
.size());
1265 for (; reqiter
!= rop
.to_read
.end(); ++reqiter
, ++resiter
) {
1266 if (reqiter
->second
.cb
) {
1267 pair
<RecoveryMessages
*, read_result_t
&> arg(
1268 m
, resiter
->second
);
1269 reqiter
->second
.cb
->complete(arg
);
1270 reqiter
->second
.cb
= NULL
;
1273 tid_to_read_map
.erase(rop
.tid
);
1276 struct FinishReadOp
: public GenContext
<ThreadPool::TPHandle
&> {
1279 FinishReadOp(ECBackend
*ec
, ceph_tid_t tid
) : ec(ec
), tid(tid
) {}
1280 void finish(ThreadPool::TPHandle
&handle
) override
{
1281 auto ropiter
= ec
->tid_to_read_map
.find(tid
);
1282 assert(ropiter
!= ec
->tid_to_read_map
.end());
1283 int priority
= ropiter
->second
.priority
;
1284 RecoveryMessages rm
;
1285 ec
->complete_read_op(ropiter
->second
, &rm
);
1286 ec
->dispatch_recovery_messages(rm
, priority
);
1290 void ECBackend::filter_read_op(
1291 const OSDMapRef
& osdmap
,
1294 set
<hobject_t
> to_cancel
;
1295 for (map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= op
.source_to_obj
.begin();
1296 i
!= op
.source_to_obj
.end();
1298 if (osdmap
->is_down(i
->first
.osd
)) {
1299 to_cancel
.insert(i
->second
.begin(), i
->second
.end());
1300 op
.in_progress
.erase(i
->first
);
1305 if (to_cancel
.empty())
1308 for (map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= op
.source_to_obj
.begin();
1309 i
!= op
.source_to_obj
.end();
1311 for (set
<hobject_t
>::iterator j
= i
->second
.begin();
1312 j
!= i
->second
.end();
1314 if (to_cancel
.count(*j
))
1315 i
->second
.erase(j
++);
1319 if (i
->second
.empty()) {
1320 op
.source_to_obj
.erase(i
++);
1322 assert(!osdmap
->is_down(i
->first
.osd
));
1327 for (set
<hobject_t
>::iterator i
= to_cancel
.begin();
1328 i
!= to_cancel
.end();
1330 get_parent()->cancel_pull(*i
);
1332 assert(op
.to_read
.count(*i
));
1333 read_request_t
&req
= op
.to_read
.find(*i
)->second
;
1334 dout(10) << __func__
<< ": canceling " << req
1335 << " for obj " << *i
<< dendl
;
1340 op
.to_read
.erase(*i
);
1341 op
.complete
.erase(*i
);
1342 recovery_ops
.erase(*i
);
1345 if (op
.in_progress
.empty()) {
1346 get_parent()->schedule_recovery_work(
1347 get_parent()->bless_gencontext(
1348 new FinishReadOp(this, op
.tid
)));
1352 void ECBackend::check_recovery_sources(const OSDMapRef
& osdmap
)
1354 set
<ceph_tid_t
> tids_to_filter
;
1355 for (map
<pg_shard_t
, set
<ceph_tid_t
> >::iterator
1356 i
= shard_to_read_map
.begin();
1357 i
!= shard_to_read_map
.end();
1359 if (osdmap
->is_down(i
->first
.osd
)) {
1360 tids_to_filter
.insert(i
->second
.begin(), i
->second
.end());
1361 shard_to_read_map
.erase(i
++);
1366 for (set
<ceph_tid_t
>::iterator i
= tids_to_filter
.begin();
1367 i
!= tids_to_filter
.end();
1369 map
<ceph_tid_t
, ReadOp
>::iterator j
= tid_to_read_map
.find(*i
);
1370 assert(j
!= tid_to_read_map
.end());
1371 filter_read_op(osdmap
, j
->second
);
1375 void ECBackend::on_change()
1377 dout(10) << __func__
<< dendl
;
1379 completed_to
= eversion_t();
1380 committed_to
= eversion_t();
1381 pipeline_state
.clear();
1382 waiting_reads
.clear();
1383 waiting_state
.clear();
1384 waiting_commit
.clear();
1385 for (auto &&op
: tid_to_op_map
) {
1386 cache
.release_write_pin(op
.second
.pin
);
1388 tid_to_op_map
.clear();
1390 for (map
<ceph_tid_t
, ReadOp
>::iterator i
= tid_to_read_map
.begin();
1391 i
!= tid_to_read_map
.end();
1393 dout(10) << __func__
<< ": cancelling " << i
->second
<< dendl
;
1394 for (map
<hobject_t
, read_request_t
>::iterator j
=
1395 i
->second
.to_read
.begin();
1396 j
!= i
->second
.to_read
.end();
1398 delete j
->second
.cb
;
1402 tid_to_read_map
.clear();
1403 in_progress_client_reads
.clear();
1404 shard_to_read_map
.clear();
1405 clear_recovery_state();
1408 void ECBackend::clear_recovery_state()
1410 recovery_ops
.clear();
1413 void ECBackend::on_flushed()
1417 void ECBackend::dump_recovery_info(Formatter
*f
) const
1419 f
->open_array_section("recovery_ops");
1420 for (map
<hobject_t
, RecoveryOp
>::const_iterator i
= recovery_ops
.begin();
1421 i
!= recovery_ops
.end();
1423 f
->open_object_section("op");
1428 f
->open_array_section("read_ops");
1429 for (map
<ceph_tid_t
, ReadOp
>::const_iterator i
= tid_to_read_map
.begin();
1430 i
!= tid_to_read_map
.end();
1432 f
->open_object_section("read_op");
1439 void ECBackend::submit_transaction(
1440 const hobject_t
&hoid
,
1441 const object_stat_sum_t
&delta_stats
,
1442 const eversion_t
&at_version
,
1443 PGTransactionUPtr
&&t
,
1444 const eversion_t
&trim_to
,
1445 const eversion_t
&roll_forward_to
,
1446 const vector
<pg_log_entry_t
> &log_entries
,
1447 boost::optional
<pg_hit_set_history_t
> &hset_history
,
1448 Context
*on_local_applied_sync
,
1449 Context
*on_all_applied
,
1450 Context
*on_all_commit
,
1453 OpRequestRef client_op
1456 assert(!tid_to_op_map
.count(tid
));
1457 Op
*op
= &(tid_to_op_map
[tid
]);
1459 op
->delta_stats
= delta_stats
;
1460 op
->version
= at_version
;
1461 op
->trim_to
= trim_to
;
1462 op
->roll_forward_to
= MAX(roll_forward_to
, committed_to
);
1463 op
->log_entries
= log_entries
;
1464 std::swap(op
->updated_hit_set_history
, hset_history
);
1465 op
->on_local_applied_sync
= on_local_applied_sync
;
1466 op
->on_all_applied
= on_all_applied
;
1467 op
->on_all_commit
= on_all_commit
;
1470 op
->client_op
= client_op
;
1472 op
->trace
= client_op
->pg_trace
;
1474 dout(10) << __func__
<< ": op " << *op
<< " starting" << dendl
;
1475 start_rmw(op
, std::move(t
));
1476 dout(10) << "onreadable_sync: " << op
->on_local_applied_sync
<< dendl
;
1479 void ECBackend::call_write_ordered(std::function
<void(void)> &&cb
) {
1480 if (!waiting_state
.empty()) {
1481 waiting_state
.back().on_write
.emplace_back(std::move(cb
));
1482 } else if (!waiting_reads
.empty()) {
1483 waiting_reads
.back().on_write
.emplace_back(std::move(cb
));
1485 // Nothing earlier in the pipeline, just call it
1490 void ECBackend::get_all_avail_shards(
1491 const hobject_t
&hoid
,
1493 map
<shard_id_t
, pg_shard_t
> &shards
,
1496 for (set
<pg_shard_t
>::const_iterator i
=
1497 get_parent()->get_acting_shards().begin();
1498 i
!= get_parent()->get_acting_shards().end();
1500 dout(10) << __func__
<< ": checking acting " << *i
<< dendl
;
1501 const pg_missing_t
&missing
= get_parent()->get_shard_missing(*i
);
1502 if (!missing
.is_missing(hoid
)) {
1503 assert(!have
.count(i
->shard
));
1504 have
.insert(i
->shard
);
1505 assert(!shards
.count(i
->shard
));
1506 shards
.insert(make_pair(i
->shard
, *i
));
1511 for (set
<pg_shard_t
>::const_iterator i
=
1512 get_parent()->get_backfill_shards().begin();
1513 i
!= get_parent()->get_backfill_shards().end();
1515 if (have
.count(i
->shard
)) {
1516 assert(shards
.count(i
->shard
));
1519 dout(10) << __func__
<< ": checking backfill " << *i
<< dendl
;
1520 assert(!shards
.count(i
->shard
));
1521 const pg_info_t
&info
= get_parent()->get_shard_info(*i
);
1522 const pg_missing_t
&missing
= get_parent()->get_shard_missing(*i
);
1523 if (hoid
< info
.last_backfill
&&
1524 !missing
.is_missing(hoid
)) {
1525 have
.insert(i
->shard
);
1526 shards
.insert(make_pair(i
->shard
, *i
));
1530 map
<hobject_t
, set
<pg_shard_t
>>::const_iterator miter
=
1531 get_parent()->get_missing_loc_shards().find(hoid
);
1532 if (miter
!= get_parent()->get_missing_loc_shards().end()) {
1533 for (set
<pg_shard_t
>::iterator i
= miter
->second
.begin();
1534 i
!= miter
->second
.end();
1536 dout(10) << __func__
<< ": checking missing_loc " << *i
<< dendl
;
1537 auto m
= get_parent()->maybe_get_shard_missing(*i
);
1539 assert(!(*m
).is_missing(hoid
));
1541 have
.insert(i
->shard
);
1542 shards
.insert(make_pair(i
->shard
, *i
));
1548 int ECBackend::get_min_avail_to_read_shards(
1549 const hobject_t
&hoid
,
1550 const set
<int> &want
,
1552 bool do_redundant_reads
,
1553 set
<pg_shard_t
> *to_read
)
1555 // Make sure we don't do redundant reads for recovery
1556 assert(!for_recovery
|| !do_redundant_reads
);
1559 map
<shard_id_t
, pg_shard_t
> shards
;
1561 get_all_avail_shards(hoid
, have
, shards
, for_recovery
);
1564 int r
= ec_impl
->minimum_to_decode(want
, have
, &need
);
1568 if (do_redundant_reads
) {
1575 for (set
<int>::iterator i
= need
.begin();
1578 assert(shards
.count(shard_id_t(*i
)));
1579 to_read
->insert(shards
[shard_id_t(*i
)]);
1584 int ECBackend::get_remaining_shards(
1585 const hobject_t
&hoid
,
1586 const set
<int> &avail
,
1587 set
<pg_shard_t
> *to_read
,
1593 map
<shard_id_t
, pg_shard_t
> shards
;
1595 get_all_avail_shards(hoid
, have
, shards
, for_recovery
);
1597 for (set
<int>::iterator i
= have
.begin();
1600 assert(shards
.count(shard_id_t(*i
)));
1601 if (avail
.find(*i
) == avail
.end())
1602 to_read
->insert(shards
[shard_id_t(*i
)]);
1607 void ECBackend::start_read_op(
1609 map
<hobject_t
, read_request_t
> &to_read
,
1611 bool do_redundant_reads
,
1614 ceph_tid_t tid
= get_parent()->get_tid();
1615 assert(!tid_to_read_map
.count(tid
));
1616 auto &op
= tid_to_read_map
.emplace(
1624 std::move(to_read
))).first
->second
;
1625 dout(10) << __func__
<< ": starting " << op
<< dendl
;
1627 op
.trace
= _op
->pg_trace
;
1628 op
.trace
.event("start ec read");
1633 void ECBackend::do_read_op(ReadOp
&op
)
1635 int priority
= op
.priority
;
1636 ceph_tid_t tid
= op
.tid
;
1638 dout(10) << __func__
<< ": starting read " << op
<< dendl
;
1640 map
<pg_shard_t
, ECSubRead
> messages
;
1641 for (map
<hobject_t
, read_request_t
>::iterator i
= op
.to_read
.begin();
1642 i
!= op
.to_read
.end();
1644 bool need_attrs
= i
->second
.want_attrs
;
1645 for (set
<pg_shard_t
>::const_iterator j
= i
->second
.need
.begin();
1646 j
!= i
->second
.need
.end();
1649 messages
[*j
].attrs_to_read
.insert(i
->first
);
1652 op
.obj_to_source
[i
->first
].insert(*j
);
1653 op
.source_to_obj
[*j
].insert(i
->first
);
1655 for (list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >::const_iterator j
=
1656 i
->second
.to_read
.begin();
1657 j
!= i
->second
.to_read
.end();
1659 pair
<uint64_t, uint64_t> chunk_off_len
=
1660 sinfo
.aligned_offset_len_to_chunk(make_pair(j
->get
<0>(), j
->get
<1>()));
1661 for (set
<pg_shard_t
>::const_iterator k
= i
->second
.need
.begin();
1662 k
!= i
->second
.need
.end();
1664 messages
[*k
].to_read
[i
->first
].push_back(
1666 chunk_off_len
.first
,
1667 chunk_off_len
.second
,
1670 assert(!need_attrs
);
1674 for (map
<pg_shard_t
, ECSubRead
>::iterator i
= messages
.begin();
1675 i
!= messages
.end();
1677 op
.in_progress
.insert(i
->first
);
1678 shard_to_read_map
[i
->first
].insert(op
.tid
);
1679 i
->second
.tid
= tid
;
1680 MOSDECSubOpRead
*msg
= new MOSDECSubOpRead
;
1681 msg
->set_priority(priority
);
1683 get_parent()->whoami_spg_t().pgid
,
1685 msg
->map_epoch
= get_parent()->get_epoch();
1686 msg
->min_epoch
= get_parent()->get_interval_start_epoch();
1687 msg
->op
= i
->second
;
1688 msg
->op
.from
= get_parent()->whoami_shard();
1691 // initialize a child span for this shard
1692 msg
->trace
.init("ec sub read", nullptr, &op
.trace
);
1693 msg
->trace
.keyval("shard", i
->first
.shard
.id
);
1695 get_parent()->send_message_osd_cluster(
1698 get_parent()->get_epoch());
1700 dout(10) << __func__
<< ": started " << op
<< dendl
;
1703 ECUtil::HashInfoRef
ECBackend::get_hash_info(
1704 const hobject_t
&hoid
, bool checks
, const map
<string
,bufferptr
> *attrs
)
1706 dout(10) << __func__
<< ": Getting attr on " << hoid
<< dendl
;
1707 ECUtil::HashInfoRef ref
= unstable_hashinfo_registry
.lookup(hoid
);
1709 dout(10) << __func__
<< ": not in cache " << hoid
<< dendl
;
1711 int r
= store
->stat(
1713 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
1715 ECUtil::HashInfo
hinfo(ec_impl
->get_chunk_count());
1716 // XXX: What does it mean if there is no object on disk?
1718 dout(10) << __func__
<< ": found on disk, size " << st
.st_size
<< dendl
;
1721 map
<string
, bufferptr
>::const_iterator k
= attrs
->find(ECUtil::get_hinfo_key());
1722 if (k
== attrs
->end()) {
1723 dout(5) << __func__
<< " " << hoid
<< " missing hinfo attr" << dendl
;
1725 bl
.push_back(k
->second
);
1730 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
1731 ECUtil::get_hinfo_key(),
1734 dout(5) << __func__
<< ": getattr failed: " << cpp_strerror(r
) << dendl
;
1735 bl
.clear(); // just in case
1738 if (bl
.length() > 0) {
1739 bufferlist::iterator bp
= bl
.begin();
1741 ::decode(hinfo
, bp
);
1743 dout(0) << __func__
<< ": Can't decode hinfo for " << hoid
<< dendl
;
1744 return ECUtil::HashInfoRef();
1746 if (checks
&& hinfo
.get_total_chunk_size() != (uint64_t)st
.st_size
) {
1747 dout(0) << __func__
<< ": Mismatch of total_chunk_size "
1748 << hinfo
.get_total_chunk_size() << dendl
;
1749 return ECUtil::HashInfoRef();
1751 } else if (st
.st_size
> 0) { // If empty object and no hinfo, create it
1752 return ECUtil::HashInfoRef();
1755 ref
= unstable_hashinfo_registry
.lookup_or_create(hoid
, hinfo
);
1760 void ECBackend::start_rmw(Op
*op
, PGTransactionUPtr
&&t
)
1764 op
->plan
= ECTransaction::get_write_plan(
1767 [&](const hobject_t
&i
) {
1768 ECUtil::HashInfoRef ref
= get_hash_info(i
, false);
1770 derr
<< __func__
<< ": get_hash_info(" << i
<< ")"
1771 << " returned a null pointer and there is no "
1772 << " way to recover from such an error in this "
1773 << " context" << dendl
;
1778 get_parent()->get_dpp());
1780 dout(10) << __func__
<< ": " << *op
<< dendl
;
1782 waiting_state
.push_back(*op
);
1786 bool ECBackend::try_state_to_reads()
1788 if (waiting_state
.empty())
1791 Op
*op
= &(waiting_state
.front());
1792 if (op
->requires_rmw() && pipeline_state
.cache_invalid()) {
1793 assert(get_parent()->get_pool().allows_ecoverwrites());
1794 dout(20) << __func__
<< ": blocking " << *op
1795 << " because it requires an rmw and the cache is invalid "
1801 if (op
->invalidates_cache()) {
1802 dout(20) << __func__
<< ": invalidating cache after this op"
1804 pipeline_state
.invalidate();
1805 op
->using_cache
= false;
1807 op
->using_cache
= pipeline_state
.caching_enabled();
1810 waiting_state
.pop_front();
1811 waiting_reads
.push_back(*op
);
1813 if (op
->using_cache
) {
1814 cache
.open_write_pin(op
->pin
);
1817 for (auto &&hpair
: op
->plan
.will_write
) {
1818 auto to_read_plan_iter
= op
->plan
.to_read
.find(hpair
.first
);
1819 const extent_set
&to_read_plan
=
1820 to_read_plan_iter
== op
->plan
.to_read
.end() ?
1822 to_read_plan_iter
->second
;
1824 extent_set remote_read
= cache
.reserve_extents_for_rmw(
1830 extent_set pending_read
= to_read_plan
;
1831 pending_read
.subtract(remote_read
);
1833 if (!remote_read
.empty()) {
1834 op
->remote_read
[hpair
.first
] = std::move(remote_read
);
1836 if (!pending_read
.empty()) {
1837 op
->pending_read
[hpair
.first
] = std::move(pending_read
);
1841 op
->remote_read
= op
->plan
.to_read
;
1844 dout(10) << __func__
<< ": " << *op
<< dendl
;
1846 if (!op
->remote_read
.empty()) {
1847 assert(get_parent()->get_pool().allows_ecoverwrites());
1848 objects_read_async_no_cache(
1850 [this, op
](map
<hobject_t
,pair
<int, extent_map
> > &&results
) {
1851 for (auto &&i
: results
) {
1852 op
->remote_read_result
.emplace(i
.first
, i
.second
.second
);
1861 bool ECBackend::try_reads_to_commit()
1863 if (waiting_reads
.empty())
1865 Op
*op
= &(waiting_reads
.front());
1866 if (op
->read_in_progress())
1868 waiting_reads
.pop_front();
1869 waiting_commit
.push_back(*op
);
1871 dout(10) << __func__
<< ": starting commit on " << *op
<< dendl
;
1872 dout(20) << __func__
<< ": " << cache
<< dendl
;
1874 get_parent()->apply_stats(
1878 if (op
->using_cache
) {
1879 for (auto &&hpair
: op
->pending_read
) {
1880 op
->remote_read_result
[hpair
.first
].insert(
1881 cache
.get_remaining_extents_for_rmw(
1886 op
->pending_read
.clear();
1888 assert(op
->pending_read
.empty());
1891 map
<shard_id_t
, ObjectStore::Transaction
> trans
;
1892 for (set
<pg_shard_t
>::const_iterator i
=
1893 get_parent()->get_actingbackfill_shards().begin();
1894 i
!= get_parent()->get_actingbackfill_shards().end();
1899 op
->trace
.event("start ec write");
1901 map
<hobject_t
,extent_map
> written
;
1903 ECTransaction::generate_transactions(
1906 get_parent()->get_info().pgid
.pgid
,
1907 (get_osdmap()->require_osd_release
< CEPH_RELEASE_KRAKEN
),
1909 op
->remote_read_result
,
1914 &(op
->temp_cleared
),
1915 get_parent()->get_dpp());
1918 dout(20) << __func__
<< ": " << cache
<< dendl
;
1919 dout(20) << __func__
<< ": written: " << written
<< dendl
;
1920 dout(20) << __func__
<< ": op: " << *op
<< dendl
;
1922 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1923 for (auto &&i
: op
->log_entries
) {
1924 if (i
.requires_kraken()) {
1925 derr
<< __func__
<< ": log entry " << i
<< " requires kraken"
1926 << " but overwrites are not enabled!" << dendl
;
1932 map
<hobject_t
,extent_set
> written_set
;
1933 for (auto &&i
: written
) {
1934 written_set
[i
.first
] = i
.second
.get_interval_set();
1936 dout(20) << __func__
<< ": written_set: " << written_set
<< dendl
;
1937 assert(written_set
== op
->plan
.will_write
);
1939 if (op
->using_cache
) {
1940 for (auto &&hpair
: written
) {
1941 dout(20) << __func__
<< ": " << hpair
<< dendl
;
1942 cache
.present_rmw_update(hpair
.first
, op
->pin
, hpair
.second
);
1945 op
->remote_read
.clear();
1946 op
->remote_read_result
.clear();
1948 dout(10) << "onreadable_sync: " << op
->on_local_applied_sync
<< dendl
;
1949 ObjectStore::Transaction empty
;
1950 bool should_write_local
= false;
1951 ECSubWrite local_write_op
;
1952 for (set
<pg_shard_t
>::const_iterator i
=
1953 get_parent()->get_actingbackfill_shards().begin();
1954 i
!= get_parent()->get_actingbackfill_shards().end();
1956 op
->pending_apply
.insert(*i
);
1957 op
->pending_commit
.insert(*i
);
1958 map
<shard_id_t
, ObjectStore::Transaction
>::iterator iter
=
1959 trans
.find(i
->shard
);
1960 assert(iter
!= trans
.end());
1961 bool should_send
= get_parent()->should_send_op(*i
, op
->hoid
);
1962 const pg_stat_t
&stats
=
1965 parent
->get_shard_info().find(*i
)->second
.stats
;
1968 get_parent()->whoami_shard(),
1973 should_send
? iter
->second
: empty
,
1976 op
->roll_forward_to
,
1978 op
->updated_hit_set_history
,
1983 ZTracer::Trace trace
;
1985 // initialize a child span for this shard
1986 trace
.init("ec sub write", nullptr, &op
->trace
);
1987 trace
.keyval("shard", i
->shard
.id
);
1990 if (*i
== get_parent()->whoami_shard()) {
1991 should_write_local
= true;
1992 local_write_op
.claim(sop
);
1994 MOSDECSubOpWrite
*r
= new MOSDECSubOpWrite(sop
);
1995 r
->pgid
= spg_t(get_parent()->primary_spg_t().pgid
, i
->shard
);
1996 r
->map_epoch
= get_parent()->get_epoch();
1997 r
->min_epoch
= get_parent()->get_interval_start_epoch();
1999 get_parent()->send_message_osd_cluster(
2000 i
->osd
, r
, get_parent()->get_epoch());
2003 if (should_write_local
) {
2005 get_parent()->whoami_shard(),
2009 op
->on_local_applied_sync
);
2010 op
->on_local_applied_sync
= 0;
2013 for (auto i
= op
->on_write
.begin();
2014 i
!= op
->on_write
.end();
2015 op
->on_write
.erase(i
++)) {
2022 bool ECBackend::try_finish_rmw()
2024 if (waiting_commit
.empty())
2026 Op
*op
= &(waiting_commit
.front());
2027 if (op
->write_in_progress())
2029 waiting_commit
.pop_front();
2031 dout(10) << __func__
<< ": " << *op
<< dendl
;
2032 dout(20) << __func__
<< ": " << cache
<< dendl
;
2034 if (op
->roll_forward_to
> completed_to
)
2035 completed_to
= op
->roll_forward_to
;
2036 if (op
->version
> committed_to
)
2037 committed_to
= op
->version
;
2039 if (get_osdmap()->require_osd_release
>= CEPH_RELEASE_KRAKEN
) {
2040 if (op
->version
> get_parent()->get_log().get_can_rollback_to() &&
2041 waiting_reads
.empty() &&
2042 waiting_commit
.empty()) {
2043 // submit a dummy transaction to kick the rollforward
2044 auto tid
= get_parent()->get_tid();
2045 Op
*nop
= &(tid_to_op_map
[tid
]);
2046 nop
->hoid
= op
->hoid
;
2047 nop
->trim_to
= op
->trim_to
;
2048 nop
->roll_forward_to
= op
->version
;
2050 nop
->reqid
= op
->reqid
;
2051 waiting_reads
.push_back(*nop
);
2055 if (op
->using_cache
) {
2056 cache
.release_write_pin(op
->pin
);
2058 tid_to_op_map
.erase(op
->tid
);
2060 if (waiting_reads
.empty() &&
2061 waiting_commit
.empty()) {
2062 pipeline_state
.clear();
2063 dout(20) << __func__
<< ": clearing pipeline_state "
2070 void ECBackend::check_ops()
2072 while (try_state_to_reads() ||
2073 try_reads_to_commit() ||
2077 int ECBackend::objects_read_sync(
2078 const hobject_t
&hoid
,
2087 void ECBackend::objects_read_async(
2088 const hobject_t
&hoid
,
2089 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2090 pair
<bufferlist
*, Context
*> > > &to_read
,
2091 Context
*on_complete
,
2094 map
<hobject_t
,std::list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > >
2099 for (list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2100 pair
<bufferlist
*, Context
*> > >::const_iterator i
=
2104 pair
<uint64_t, uint64_t> tmp
=
2105 sinfo
.offset_len_to_stripe_bounds(
2106 make_pair(i
->first
.get
<0>(), i
->first
.get
<1>()));
2109 esnew
.insert(tmp
.first
, tmp
.second
);
2111 flags
|= i
->first
.get
<2>();
2115 auto &offsets
= reads
[hoid
];
2116 for (auto j
= es
.begin();
2130 list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2131 pair
<bufferlist
*, Context
*> > > to_read
;
2132 unique_ptr
<Context
> on_complete
;
2133 cb(const cb
&) = delete;
2134 cb(cb
&&) = default;
2136 const hobject_t
&hoid
,
2137 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2138 pair
<bufferlist
*, Context
*> > > &to_read
,
2139 Context
*on_complete
)
2143 on_complete(on_complete
) {}
2144 void operator()(map
<hobject_t
,pair
<int, extent_map
> > &&results
) {
2145 auto dpp
= ec
->get_parent()->get_dpp();
2146 ldpp_dout(dpp
, 20) << "objects_read_async_cb: got: " << results
2148 ldpp_dout(dpp
, 20) << "objects_read_async_cb: cache: " << ec
->cache
2151 auto &got
= results
[hoid
];
2154 for (auto &&read
: to_read
) {
2155 if (got
.first
< 0) {
2156 if (read
.second
.second
) {
2157 read
.second
.second
->complete(got
.first
);
2162 assert(read
.second
.first
);
2163 uint64_t offset
= read
.first
.get
<0>();
2164 uint64_t length
= read
.first
.get
<1>();
2165 auto range
= got
.second
.get_containing_range(offset
, length
);
2166 assert(range
.first
!= range
.second
);
2167 assert(range
.first
.get_off() <= offset
);
2169 (offset
+ length
) <=
2170 (range
.first
.get_off() + range
.first
.get_len()));
2171 read
.second
.first
->substr_of(
2172 range
.first
.get_val(),
2173 offset
- range
.first
.get_off(),
2175 if (read
.second
.second
) {
2176 read
.second
.second
->complete(length
);
2177 read
.second
.second
= nullptr;
2183 on_complete
.release()->complete(r
);
2187 for (auto &&i
: to_read
) {
2188 delete i
.second
.second
;
2193 objects_read_and_reconstruct(
2196 make_gen_lambda_context
<
2197 map
<hobject_t
,pair
<int, extent_map
> > &&, cb
>(
2204 struct CallClientContexts
:
2205 public GenContext
<pair
<RecoveryMessages
*, ECBackend::read_result_t
& > &> {
2208 ECBackend::ClientAsyncReadStatus
*status
;
2209 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > to_read
;
2213 ECBackend::ClientAsyncReadStatus
*status
,
2214 const list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > &to_read
)
2215 : hoid(hoid
), ec(ec
), status(status
), to_read(to_read
) {}
2216 void finish(pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
) override
{
2217 ECBackend::read_result_t
&res
= in
.second
;
2221 assert(res
.returned
.size() == to_read
.size());
2223 assert(res
.errors
.empty());
2224 for (auto &&read
: to_read
) {
2225 pair
<uint64_t, uint64_t> adjusted
=
2226 ec
->sinfo
.offset_len_to_stripe_bounds(
2227 make_pair(read
.get
<0>(), read
.get
<1>()));
2228 assert(res
.returned
.front().get
<0>() == adjusted
.first
&&
2229 res
.returned
.front().get
<1>() == adjusted
.second
);
2230 map
<int, bufferlist
> to_decode
;
2232 for (map
<pg_shard_t
, bufferlist
>::iterator j
=
2233 res
.returned
.front().get
<2>().begin();
2234 j
!= res
.returned
.front().get
<2>().end();
2236 to_decode
[j
->first
.shard
].claim(j
->second
);
2238 int r
= ECUtil::decode(
2250 read
.get
<0>() - adjusted
.first
,
2252 bl
.length() - (read
.get
<0>() - adjusted
.first
)));
2254 read
.get
<0>(), trimmed
.length(), std::move(trimmed
));
2255 res
.returned
.pop_front();
2258 status
->complete_object(hoid
, res
.r
, std::move(result
));
2263 void ECBackend::objects_read_and_reconstruct(
2264 const map
<hobject_t
,
2265 std::list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >
2268 GenContextURef
<map
<hobject_t
,pair
<int, extent_map
> > &&> &&func
)
2270 in_progress_client_reads
.emplace_back(
2271 reads
.size(), std::move(func
));
2272 if (!reads
.size()) {
2277 set
<int> want_to_read
;
2278 get_want_to_read_shards(&want_to_read
);
2280 map
<hobject_t
, read_request_t
> for_read_op
;
2281 for (auto &&to_read
: reads
) {
2282 set
<pg_shard_t
> shards
;
2283 int r
= get_min_avail_to_read_shards(
2291 CallClientContexts
*c
= new CallClientContexts(
2294 &(in_progress_client_reads
.back()),
2307 CEPH_MSG_PRIO_DEFAULT
,
2315 int ECBackend::send_all_remaining_reads(
2316 const hobject_t
&hoid
,
2319 set
<int> already_read
;
2320 const set
<pg_shard_t
>& ots
= rop
.obj_to_source
[hoid
];
2321 for (set
<pg_shard_t
>::iterator i
= ots
.begin(); i
!= ots
.end(); ++i
)
2322 already_read
.insert(i
->shard
);
2323 dout(10) << __func__
<< " have/error shards=" << already_read
<< dendl
;
2324 set
<pg_shard_t
> shards
;
2325 int r
= get_remaining_shards(hoid
, already_read
, &shards
, rop
.for_recovery
);
2331 dout(10) << __func__
<< " Read remaining shards " << shards
<< dendl
;
2333 // TODOSAM: this doesn't seem right
2334 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > offsets
=
2335 rop
.to_read
.find(hoid
)->second
.to_read
;
2336 GenContext
<pair
<RecoveryMessages
*, read_result_t
& > &> *c
=
2337 rop
.to_read
.find(hoid
)->second
.cb
;
2339 map
<hobject_t
, read_request_t
> for_read_op
;
2349 rop
.to_read
.swap(for_read_op
);
2354 int ECBackend::objects_get_attrs(
2355 const hobject_t
&hoid
,
2356 map
<string
, bufferlist
> *out
)
2358 int r
= store
->getattrs(
2360 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
2365 for (map
<string
, bufferlist
>::iterator i
= out
->begin();
2368 if (ECUtil::is_hinfo_key_string(i
->first
))
2376 void ECBackend::rollback_append(
2377 const hobject_t
&hoid
,
2379 ObjectStore::Transaction
*t
)
2381 assert(old_size
% sinfo
.get_stripe_width() == 0);
2384 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
2385 sinfo
.aligned_logical_offset_to_chunk_offset(
2389 void ECBackend::be_deep_scrub(
2390 const hobject_t
&poid
,
2392 ScrubMap::object
&o
,
2393 ThreadPool::TPHandle
&handle
) {
2394 bufferhash
h(-1); // we always used -1
2396 uint64_t stride
= cct
->_conf
->osd_deep_scrub_stride
;
2397 if (stride
% sinfo
.get_chunk_size())
2398 stride
+= sinfo
.get_chunk_size() - (stride
% sinfo
.get_chunk_size());
2401 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
| CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
2405 handle
.reset_tp_timeout();
2409 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
2415 if (bl
.length() % sinfo
.get_chunk_size()) {
2421 if ((unsigned)r
< stride
)
2426 dout(0) << "_scan_list " << poid
<< " got "
2427 << r
<< " on read, read_error" << dendl
;
2428 o
.read_error
= true;
2432 ECUtil::HashInfoRef hinfo
= get_hash_info(poid
, false, &o
.attrs
);
2434 dout(0) << "_scan_list " << poid
<< " could not retrieve hash info" << dendl
;
2435 o
.read_error
= true;
2436 o
.digest_present
= false;
2439 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2440 assert(hinfo
->has_chunk_hash());
2441 if (hinfo
->get_total_chunk_size() != pos
) {
2442 dout(0) << "_scan_list " << poid
<< " got incorrect size on read" << dendl
;
2443 o
.ec_size_mismatch
= true;
2447 if (hinfo
->get_chunk_hash(get_parent()->whoami_shard().shard
) != h
.digest()) {
2448 dout(0) << "_scan_list " << poid
<< " got incorrect hash on read" << dendl
;
2449 o
.ec_hash_mismatch
= true;
2453 /* We checked above that we match our own stored hash. We cannot
2454 * send a hash of the actual object, so instead we simply send
2455 * our locally stored hash of shard 0 on the assumption that if
2456 * we match our chunk hash and our recollection of the hash for
2457 * chunk 0 matches that of our peers, there is likely no corruption.
2459 o
.digest
= hinfo
->get_chunk_hash(0);
2460 o
.digest_present
= true;
2462 /* Hack! We must be using partial overwrites, and partial overwrites
2463 * don't support deep-scrub yet
2466 o
.digest_present
= true;
2470 o
.omap_digest
= seed
;
2471 o
.omap_digest_present
= true;