1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include "ECBackend.h"
19 #include "messages/MOSDPGPush.h"
20 #include "messages/MOSDPGPushReply.h"
21 #include "messages/MOSDECSubOpWrite.h"
22 #include "messages/MOSDECSubOpWriteReply.h"
23 #include "messages/MOSDECSubOpRead.h"
24 #include "messages/MOSDECSubOpReadReply.h"
25 #include "ECMsgTypes.h"
27 #include "PrimaryLogPG.h"
29 #define dout_context cct
30 #define dout_subsys ceph_subsys_osd
31 #define DOUT_PREFIX_ARGS this
33 #define dout_prefix _prefix(_dout, this)
34 static ostream
& _prefix(std::ostream
*_dout
, ECBackend
*pgb
) {
35 return *_dout
<< pgb
->get_parent()->gen_dbg_prefix();
38 struct ECRecoveryHandle
: public PGBackend::RecoveryHandle
{
39 list
<ECBackend::RecoveryOp
> ops
;
42 ostream
&operator<<(ostream
&lhs
, const ECBackend::pipeline_state_t
&rhs
) {
43 switch (rhs
.pipeline_state
) {
44 case ECBackend::pipeline_state_t::CACHE_VALID
:
45 return lhs
<< "CACHE_VALID";
46 case ECBackend::pipeline_state_t::CACHE_INVALID
:
47 return lhs
<< "CACHE_INVALID";
49 assert(0 == "invalid pipeline state");
51 return lhs
; // unreachable
54 static ostream
&operator<<(ostream
&lhs
, const map
<pg_shard_t
, bufferlist
> &rhs
)
57 for (map
<pg_shard_t
, bufferlist
>::const_iterator i
= rhs
.begin();
62 lhs
<< make_pair(i
->first
, i
->second
.length());
67 static ostream
&operator<<(ostream
&lhs
, const map
<int, bufferlist
> &rhs
)
70 for (map
<int, bufferlist
>::const_iterator i
= rhs
.begin();
75 lhs
<< make_pair(i
->first
, i
->second
.length());
80 static ostream
&operator<<(
82 const boost::tuple
<uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > &rhs
)
84 return lhs
<< "(" << rhs
.get
<0>() << ", "
85 << rhs
.get
<1>() << ", " << rhs
.get
<2>() << ")";
88 ostream
&operator<<(ostream
&lhs
, const ECBackend::read_request_t
&rhs
)
90 return lhs
<< "read_request_t(to_read=[" << rhs
.to_read
<< "]"
91 << ", need=" << rhs
.need
92 << ", want_attrs=" << rhs
.want_attrs
96 ostream
&operator<<(ostream
&lhs
, const ECBackend::read_result_t
&rhs
)
98 lhs
<< "read_result_t(r=" << rhs
.r
99 << ", errors=" << rhs
.errors
;
101 lhs
<< ", attrs=" << rhs
.attrs
.get();
105 return lhs
<< ", returned=" << rhs
.returned
<< ")";
108 ostream
&operator<<(ostream
&lhs
, const ECBackend::ReadOp
&rhs
)
110 lhs
<< "ReadOp(tid=" << rhs
.tid
;
111 if (rhs
.op
&& rhs
.op
->get_req()) {
113 rhs
.op
->get_req()->print(lhs
);
115 return lhs
<< ", to_read=" << rhs
.to_read
116 << ", complete=" << rhs
.complete
117 << ", priority=" << rhs
.priority
118 << ", obj_to_source=" << rhs
.obj_to_source
119 << ", source_to_obj=" << rhs
.source_to_obj
120 << ", in_progress=" << rhs
.in_progress
<< ")";
123 void ECBackend::ReadOp::dump(Formatter
*f
) const
125 f
->dump_unsigned("tid", tid
);
126 if (op
&& op
->get_req()) {
127 f
->dump_stream("op") << *(op
->get_req());
129 f
->dump_stream("to_read") << to_read
;
130 f
->dump_stream("complete") << complete
;
131 f
->dump_int("priority", priority
);
132 f
->dump_stream("obj_to_source") << obj_to_source
;
133 f
->dump_stream("source_to_obj") << source_to_obj
;
134 f
->dump_stream("in_progress") << in_progress
;
137 ostream
&operator<<(ostream
&lhs
, const ECBackend::Op
&rhs
)
139 lhs
<< "Op(" << rhs
.hoid
140 << " v=" << rhs
.version
141 << " tt=" << rhs
.trim_to
142 << " tid=" << rhs
.tid
143 << " reqid=" << rhs
.reqid
;
144 if (rhs
.client_op
&& rhs
.client_op
->get_req()) {
145 lhs
<< " client_op=";
146 rhs
.client_op
->get_req()->print(lhs
);
148 lhs
<< " roll_forward_to=" << rhs
.roll_forward_to
149 << " temp_added=" << rhs
.temp_added
150 << " temp_cleared=" << rhs
.temp_cleared
151 << " pending_read=" << rhs
.pending_read
152 << " remote_read=" << rhs
.remote_read
153 << " remote_read_result=" << rhs
.remote_read_result
154 << " pending_apply=" << rhs
.pending_apply
155 << " pending_commit=" << rhs
.pending_commit
156 << " plan.to_read=" << rhs
.plan
.to_read
157 << " plan.will_write=" << rhs
.plan
.will_write
162 ostream
&operator<<(ostream
&lhs
, const ECBackend::RecoveryOp
&rhs
)
164 return lhs
<< "RecoveryOp("
165 << "hoid=" << rhs
.hoid
167 << " missing_on=" << rhs
.missing_on
168 << " missing_on_shards=" << rhs
.missing_on_shards
169 << " recovery_info=" << rhs
.recovery_info
170 << " recovery_progress=" << rhs
.recovery_progress
171 << " obc refcount=" << rhs
.obc
.use_count()
172 << " state=" << ECBackend::RecoveryOp::tostr(rhs
.state
)
173 << " waiting_on_pushes=" << rhs
.waiting_on_pushes
174 << " extent_requested=" << rhs
.extent_requested
178 void ECBackend::RecoveryOp::dump(Formatter
*f
) const
180 f
->dump_stream("hoid") << hoid
;
181 f
->dump_stream("v") << v
;
182 f
->dump_stream("missing_on") << missing_on
;
183 f
->dump_stream("missing_on_shards") << missing_on_shards
;
184 f
->dump_stream("recovery_info") << recovery_info
;
185 f
->dump_stream("recovery_progress") << recovery_progress
;
186 f
->dump_stream("state") << tostr(state
);
187 f
->dump_stream("waiting_on_pushes") << waiting_on_pushes
;
188 f
->dump_stream("extent_requested") << extent_requested
;
191 ECBackend::ECBackend(
192 PGBackend::Listener
*pg
,
194 ObjectStore::CollectionHandle
&ch
,
197 ErasureCodeInterfaceRef ec_impl
,
198 uint64_t stripe_width
)
199 : PGBackend(cct
, pg
, store
, coll
, ch
),
201 sinfo(ec_impl
->get_data_chunk_count(), stripe_width
) {
202 assert((ec_impl
->get_data_chunk_count() *
203 ec_impl
->get_chunk_size(stripe_width
)) == stripe_width
);
206 PGBackend::RecoveryHandle
*ECBackend::open_recovery_op()
208 return new ECRecoveryHandle
;
211 void ECBackend::_failed_push(const hobject_t
&hoid
,
212 pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
)
214 ECBackend::read_result_t
&res
= in
.second
;
215 dout(10) << __func__
<< ": Read error " << hoid
<< " r="
216 << res
.r
<< " errors=" << res
.errors
<< dendl
;
217 dout(10) << __func__
<< ": canceling recovery op for obj " << hoid
219 assert(recovery_ops
.count(hoid
));
220 eversion_t v
= recovery_ops
[hoid
].v
;
221 recovery_ops
.erase(hoid
);
224 for (auto&& i
: res
.errors
) {
225 fl
.push_back(i
.first
);
227 get_parent()->failed_push(fl
, hoid
);
228 get_parent()->backfill_add_missing(hoid
, v
);
229 get_parent()->finish_degraded_object(hoid
);
232 struct OnRecoveryReadComplete
:
233 public GenContext
<pair
<RecoveryMessages
*, ECBackend::read_result_t
& > &> {
237 OnRecoveryReadComplete(ECBackend
*pg
, const hobject_t
&hoid
)
238 : pg(pg
), hoid(hoid
) {}
239 void finish(pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
) override
{
240 ECBackend::read_result_t
&res
= in
.second
;
241 if (!(res
.r
== 0 && res
.errors
.empty())) {
242 pg
->_failed_push(hoid
, in
);
245 assert(res
.returned
.size() == 1);
246 pg
->handle_recovery_read_complete(
254 struct RecoveryMessages
{
256 ECBackend::read_request_t
> reads
;
259 const hobject_t
&hoid
, uint64_t off
, uint64_t len
,
260 const set
<pg_shard_t
> &need
,
262 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > to_read
;
263 to_read
.push_back(boost::make_tuple(off
, len
, 0));
264 assert(!reads
.count(hoid
));
268 ECBackend::read_request_t(
272 new OnRecoveryReadComplete(
277 map
<pg_shard_t
, vector
<PushOp
> > pushes
;
278 map
<pg_shard_t
, vector
<PushReplyOp
> > push_replies
;
279 ObjectStore::Transaction t
;
280 RecoveryMessages() {}
281 ~RecoveryMessages(){}
284 void ECBackend::handle_recovery_push(
289 if (get_parent()->check_failsafe_full(ss
)) {
290 dout(10) << __func__
<< " Out of space (failsafe) processing push request: " << ss
.str() << dendl
;
294 bool oneshot
= op
.before_progress
.first
&& op
.after_progress
.data_complete
;
297 tobj
= ghobject_t(op
.soid
, ghobject_t::NO_GEN
,
298 get_parent()->whoami_shard().shard
);
300 tobj
= ghobject_t(get_parent()->get_temp_recovery_object(op
.soid
,
303 get_parent()->whoami_shard().shard
);
304 if (op
.before_progress
.first
) {
305 dout(10) << __func__
<< ": Adding oid "
306 << tobj
.hobj
<< " in the temp collection" << dendl
;
307 add_temp_obj(tobj
.hobj
);
311 if (op
.before_progress
.first
) {
312 m
->t
.remove(coll
, tobj
);
313 m
->t
.touch(coll
, tobj
);
316 if (!op
.data_included
.empty()) {
317 uint64_t start
= op
.data_included
.range_start();
318 uint64_t end
= op
.data_included
.range_end();
319 assert(op
.data
.length() == (end
- start
));
328 assert(op
.data
.length() == 0);
331 if (op
.before_progress
.first
) {
332 assert(op
.attrset
.count(string("_")));
339 if (op
.after_progress
.data_complete
&& !oneshot
) {
340 dout(10) << __func__
<< ": Removing oid "
341 << tobj
.hobj
<< " from the temp collection" << dendl
;
342 clear_temp_obj(tobj
.hobj
);
343 m
->t
.remove(coll
, ghobject_t(
344 op
.soid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
345 m
->t
.collection_move_rename(
348 op
.soid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
350 if (op
.after_progress
.data_complete
) {
351 if ((get_parent()->pgb_is_primary())) {
352 assert(recovery_ops
.count(op
.soid
));
353 assert(recovery_ops
[op
.soid
].obc
);
354 get_parent()->on_local_recover(
357 recovery_ops
[op
.soid
].obc
,
361 get_parent()->on_local_recover(
369 m
->push_replies
[get_parent()->primary_shard()].push_back(PushReplyOp());
370 m
->push_replies
[get_parent()->primary_shard()].back().soid
= op
.soid
;
373 void ECBackend::handle_recovery_push_reply(
374 const PushReplyOp
&op
,
378 if (!recovery_ops
.count(op
.soid
))
380 RecoveryOp
&rop
= recovery_ops
[op
.soid
];
381 assert(rop
.waiting_on_pushes
.count(from
));
382 rop
.waiting_on_pushes
.erase(from
);
383 continue_recovery_op(rop
, m
);
386 void ECBackend::handle_recovery_read_complete(
387 const hobject_t
&hoid
,
388 boost::tuple
<uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > &to_read
,
389 boost::optional
<map
<string
, bufferlist
> > attrs
,
392 dout(10) << __func__
<< ": returned " << hoid
<< " "
393 << "(" << to_read
.get
<0>()
394 << ", " << to_read
.get
<1>()
395 << ", " << to_read
.get
<2>()
398 assert(recovery_ops
.count(hoid
));
399 RecoveryOp
&op
= recovery_ops
[hoid
];
400 assert(op
.returned_data
.empty());
401 map
<int, bufferlist
*> target
;
402 for (set
<shard_id_t
>::iterator i
= op
.missing_on_shards
.begin();
403 i
!= op
.missing_on_shards
.end();
405 target
[*i
] = &(op
.returned_data
[*i
]);
407 map
<int, bufferlist
> from
;
408 for(map
<pg_shard_t
, bufferlist
>::iterator i
= to_read
.get
<2>().begin();
409 i
!= to_read
.get
<2>().end();
411 from
[i
->first
.shard
].claim(i
->second
);
413 dout(10) << __func__
<< ": " << from
<< dendl
;
414 int r
= ECUtil::decode(sinfo
, ec_impl
, from
, target
);
417 op
.xattrs
.swap(*attrs
);
420 // attrs only reference the origin bufferlist (decode from
421 // ECSubReadReply message) whose size is much greater than attrs
422 // in recovery. If obc cache it (get_obc maybe cache the attr),
423 // this causes the whole origin bufferlist would not be free
424 // until obc is evicted from obc cache. So rebuild the
425 // bufferlist before cache it.
426 for (map
<string
, bufferlist
>::iterator it
= op
.xattrs
.begin();
427 it
!= op
.xattrs
.end();
429 it
->second
.rebuild();
431 // Need to remove ECUtil::get_hinfo_key() since it should not leak out
432 // of the backend (see bug #12983)
433 map
<string
, bufferlist
> sanitized_attrs(op
.xattrs
);
434 sanitized_attrs
.erase(ECUtil::get_hinfo_key());
435 op
.obc
= get_parent()->get_obc(hoid
, sanitized_attrs
);
437 op
.recovery_info
.size
= op
.obc
->obs
.oi
.size
;
438 op
.recovery_info
.oi
= op
.obc
->obs
.oi
;
441 ECUtil::HashInfo
hinfo(ec_impl
->get_chunk_count());
442 if (op
.obc
->obs
.oi
.size
> 0) {
443 assert(op
.xattrs
.count(ECUtil::get_hinfo_key()));
444 bufferlist::iterator bp
= op
.xattrs
[ECUtil::get_hinfo_key()].begin();
447 op
.hinfo
= unstable_hashinfo_registry
.lookup_or_create(hoid
, hinfo
);
449 assert(op
.xattrs
.size());
451 continue_recovery_op(op
, m
);
454 struct SendPushReplies
: public Context
{
455 PGBackend::Listener
*l
;
457 map
<int, MOSDPGPushReply
*> replies
;
459 PGBackend::Listener
*l
,
461 map
<int, MOSDPGPushReply
*> &in
) : l(l
), epoch(epoch
) {
464 void finish(int) override
{
465 for (map
<int, MOSDPGPushReply
*>::iterator i
= replies
.begin();
468 l
->send_message_osd_cluster(i
->first
, i
->second
, epoch
);
472 ~SendPushReplies() override
{
473 for (map
<int, MOSDPGPushReply
*>::iterator i
= replies
.begin();
482 void ECBackend::dispatch_recovery_messages(RecoveryMessages
&m
, int priority
)
484 for (map
<pg_shard_t
, vector
<PushOp
> >::iterator i
= m
.pushes
.begin();
486 m
.pushes
.erase(i
++)) {
487 MOSDPGPush
*msg
= new MOSDPGPush();
488 msg
->set_priority(priority
);
489 msg
->map_epoch
= get_parent()->get_epoch();
490 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
491 msg
->from
= get_parent()->whoami_shard();
492 msg
->pgid
= spg_t(get_parent()->get_info().pgid
.pgid
, i
->first
.shard
);
493 msg
->pushes
.swap(i
->second
);
494 msg
->compute_cost(cct
);
495 get_parent()->send_message(
499 map
<int, MOSDPGPushReply
*> replies
;
500 for (map
<pg_shard_t
, vector
<PushReplyOp
> >::iterator i
=
501 m
.push_replies
.begin();
502 i
!= m
.push_replies
.end();
503 m
.push_replies
.erase(i
++)) {
504 MOSDPGPushReply
*msg
= new MOSDPGPushReply();
505 msg
->set_priority(priority
);
506 msg
->map_epoch
= get_parent()->get_epoch();
507 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
508 msg
->from
= get_parent()->whoami_shard();
509 msg
->pgid
= spg_t(get_parent()->get_info().pgid
.pgid
, i
->first
.shard
);
510 msg
->replies
.swap(i
->second
);
511 msg
->compute_cost(cct
);
512 replies
.insert(make_pair(i
->first
.osd
, msg
));
515 if (!replies
.empty()) {
516 (m
.t
).register_on_complete(
517 get_parent()->bless_context(
520 get_parent()->get_epoch(),
522 get_parent()->queue_transaction(std::move(m
.t
));
534 void ECBackend::continue_recovery_op(
538 dout(10) << __func__
<< ": continuing " << op
<< dendl
;
541 case RecoveryOp::IDLE
: {
543 op
.state
= RecoveryOp::READING
;
544 assert(!op
.recovery_progress
.data_complete
);
545 set
<int> want(op
.missing_on_shards
.begin(), op
.missing_on_shards
.end());
546 uint64_t from
= op
.recovery_progress
.data_recovered_to
;
547 uint64_t amount
= get_recovery_chunk_size();
549 if (op
.recovery_progress
.first
&& op
.obc
) {
550 /* We've got the attrs and the hinfo, might as well use them */
551 op
.hinfo
= get_hash_info(op
.hoid
);
553 op
.xattrs
= op
.obc
->attr_cache
;
554 ::encode(*(op
.hinfo
), op
.xattrs
[ECUtil::get_hinfo_key()]);
557 set
<pg_shard_t
> to_read
;
558 int r
= get_min_avail_to_read_shards(
559 op
.hoid
, want
, true, false, &to_read
);
561 // we must have lost a recovery source
562 assert(!op
.recovery_progress
.first
);
563 dout(10) << __func__
<< ": canceling recovery op for obj " << op
.hoid
565 get_parent()->cancel_pull(op
.hoid
);
566 recovery_ops
.erase(op
.hoid
);
572 op
.recovery_progress
.data_recovered_to
,
575 op
.recovery_progress
.first
&& !op
.obc
);
576 op
.extent_requested
= make_pair(
579 dout(10) << __func__
<< ": IDLE return " << op
<< dendl
;
582 case RecoveryOp::READING
: {
583 // read completed, start write
584 assert(op
.xattrs
.size());
585 assert(op
.returned_data
.size());
586 op
.state
= RecoveryOp::WRITING
;
587 ObjectRecoveryProgress after_progress
= op
.recovery_progress
;
588 after_progress
.data_recovered_to
+= op
.extent_requested
.second
;
589 after_progress
.first
= false;
590 if (after_progress
.data_recovered_to
>= op
.obc
->obs
.oi
.size
) {
591 after_progress
.data_recovered_to
=
592 sinfo
.logical_to_next_stripe_offset(
593 op
.obc
->obs
.oi
.size
);
594 after_progress
.data_complete
= true;
596 for (set
<pg_shard_t
>::iterator mi
= op
.missing_on
.begin();
597 mi
!= op
.missing_on
.end();
599 assert(op
.returned_data
.count(mi
->shard
));
600 m
->pushes
[*mi
].push_back(PushOp());
601 PushOp
&pop
= m
->pushes
[*mi
].back();
604 pop
.data
= op
.returned_data
[mi
->shard
];
605 dout(10) << __func__
<< ": before_progress=" << op
.recovery_progress
606 << ", after_progress=" << after_progress
607 << ", pop.data.length()=" << pop
.data
.length()
608 << ", size=" << op
.obc
->obs
.oi
.size
<< dendl
;
611 sinfo
.aligned_logical_offset_to_chunk_offset(
612 after_progress
.data_recovered_to
-
613 op
.recovery_progress
.data_recovered_to
)
615 if (pop
.data
.length())
616 pop
.data_included
.insert(
617 sinfo
.aligned_logical_offset_to_chunk_offset(
618 op
.recovery_progress
.data_recovered_to
),
621 if (op
.recovery_progress
.first
) {
622 pop
.attrset
= op
.xattrs
;
624 pop
.recovery_info
= op
.recovery_info
;
625 pop
.before_progress
= op
.recovery_progress
;
626 pop
.after_progress
= after_progress
;
627 if (*mi
!= get_parent()->primary_shard())
628 get_parent()->begin_peer_recover(
632 op
.returned_data
.clear();
633 op
.waiting_on_pushes
= op
.missing_on
;
634 op
.recovery_progress
= after_progress
;
635 dout(10) << __func__
<< ": READING return " << op
<< dendl
;
638 case RecoveryOp::WRITING
: {
639 if (op
.waiting_on_pushes
.empty()) {
640 if (op
.recovery_progress
.data_complete
) {
641 op
.state
= RecoveryOp::COMPLETE
;
642 for (set
<pg_shard_t
>::iterator i
= op
.missing_on
.begin();
643 i
!= op
.missing_on
.end();
645 if (*i
!= get_parent()->primary_shard()) {
646 dout(10) << __func__
<< ": on_peer_recover on " << *i
647 << ", obj " << op
.hoid
<< dendl
;
648 get_parent()->on_peer_recover(
654 object_stat_sum_t stat
;
655 stat
.num_bytes_recovered
= op
.recovery_info
.size
;
656 stat
.num_keys_recovered
= 0; // ??? op ... omap_entries.size(); ?
657 stat
.num_objects_recovered
= 1;
658 get_parent()->on_global_recover(op
.hoid
, stat
, false);
659 dout(10) << __func__
<< ": WRITING return " << op
<< dendl
;
660 recovery_ops
.erase(op
.hoid
);
663 op
.state
= RecoveryOp::IDLE
;
664 dout(10) << __func__
<< ": WRITING continue " << op
<< dendl
;
670 // should never be called once complete
671 case RecoveryOp::COMPLETE
:
679 void ECBackend::run_recovery_op(
683 ECRecoveryHandle
*h
= static_cast<ECRecoveryHandle
*>(_h
);
685 for (list
<RecoveryOp
>::iterator i
= h
->ops
.begin();
688 dout(10) << __func__
<< ": starting " << *i
<< dendl
;
689 assert(!recovery_ops
.count(i
->hoid
));
690 RecoveryOp
&op
= recovery_ops
.insert(make_pair(i
->hoid
, *i
)).first
->second
;
691 continue_recovery_op(op
, &m
);
694 dispatch_recovery_messages(m
, priority
);
695 send_recovery_deletes(priority
, h
->deletes
);
699 int ECBackend::recover_object(
700 const hobject_t
&hoid
,
702 ObjectContextRef head
,
703 ObjectContextRef obc
,
706 ECRecoveryHandle
*h
= static_cast<ECRecoveryHandle
*>(_h
);
707 h
->ops
.push_back(RecoveryOp());
709 h
->ops
.back().hoid
= hoid
;
710 h
->ops
.back().obc
= obc
;
711 h
->ops
.back().recovery_info
.soid
= hoid
;
712 h
->ops
.back().recovery_info
.version
= v
;
714 h
->ops
.back().recovery_info
.size
= obc
->obs
.oi
.size
;
715 h
->ops
.back().recovery_info
.oi
= obc
->obs
.oi
;
717 if (hoid
.is_snap()) {
720 h
->ops
.back().recovery_info
.ss
= obc
->ssc
->snapset
;
723 h
->ops
.back().recovery_info
.ss
= head
->ssc
->snapset
;
725 assert(0 == "neither obc nor head set for a snap object");
728 h
->ops
.back().recovery_progress
.omap_complete
= true;
729 for (set
<pg_shard_t
>::const_iterator i
=
730 get_parent()->get_actingbackfill_shards().begin();
731 i
!= get_parent()->get_actingbackfill_shards().end();
733 dout(10) << "checking " << *i
<< dendl
;
734 if (get_parent()->get_shard_missing(*i
).is_missing(hoid
)) {
735 h
->ops
.back().missing_on
.insert(*i
);
736 h
->ops
.back().missing_on_shards
.insert(i
->shard
);
739 dout(10) << __func__
<< ": built op " << h
->ops
.back() << dendl
;
743 bool ECBackend::can_handle_while_inactive(
749 bool ECBackend::_handle_message(
752 dout(10) << __func__
<< ": " << *_op
->get_req() << dendl
;
753 int priority
= _op
->get_req()->get_priority();
754 switch (_op
->get_req()->get_type()) {
755 case MSG_OSD_EC_WRITE
: {
756 // NOTE: this is non-const because handle_sub_write modifies the embedded
757 // ObjectStore::Transaction in place (and then std::move's it). It does
758 // not conflict with ECSubWrite's operator<<.
759 MOSDECSubOpWrite
*op
= static_cast<MOSDECSubOpWrite
*>(
760 _op
->get_nonconst_req());
761 handle_sub_write(op
->op
.from
, _op
, op
->op
, _op
->pg_trace
);
764 case MSG_OSD_EC_WRITE_REPLY
: {
765 const MOSDECSubOpWriteReply
*op
= static_cast<const MOSDECSubOpWriteReply
*>(
767 handle_sub_write_reply(op
->op
.from
, op
->op
, _op
->pg_trace
);
770 case MSG_OSD_EC_READ
: {
771 const MOSDECSubOpRead
*op
= static_cast<const MOSDECSubOpRead
*>(_op
->get_req());
772 MOSDECSubOpReadReply
*reply
= new MOSDECSubOpReadReply
;
773 reply
->pgid
= get_parent()->primary_spg_t();
774 reply
->map_epoch
= get_parent()->get_epoch();
775 reply
->min_epoch
= get_parent()->get_interval_start_epoch();
776 handle_sub_read(op
->op
.from
, op
->op
, &(reply
->op
), _op
->pg_trace
);
777 reply
->trace
= _op
->pg_trace
;
778 get_parent()->send_message_osd_cluster(
779 op
->op
.from
.osd
, reply
, get_parent()->get_epoch());
782 case MSG_OSD_EC_READ_REPLY
: {
783 // NOTE: this is non-const because handle_sub_read_reply steals resulting
784 // buffers. It does not conflict with ECSubReadReply operator<<.
785 MOSDECSubOpReadReply
*op
= static_cast<MOSDECSubOpReadReply
*>(
786 _op
->get_nonconst_req());
788 handle_sub_read_reply(op
->op
.from
, op
->op
, &rm
, _op
->pg_trace
);
789 dispatch_recovery_messages(rm
, priority
);
792 case MSG_OSD_PG_PUSH
: {
793 const MOSDPGPush
*op
= static_cast<const MOSDPGPush
*>(_op
->get_req());
795 for (vector
<PushOp
>::const_iterator i
= op
->pushes
.begin();
796 i
!= op
->pushes
.end();
798 handle_recovery_push(*i
, &rm
);
800 dispatch_recovery_messages(rm
, priority
);
803 case MSG_OSD_PG_PUSH_REPLY
: {
804 const MOSDPGPushReply
*op
= static_cast<const MOSDPGPushReply
*>(
807 for (vector
<PushReplyOp
>::const_iterator i
= op
->replies
.begin();
808 i
!= op
->replies
.end();
810 handle_recovery_push_reply(*i
, op
->from
, &rm
);
812 dispatch_recovery_messages(rm
, priority
);
821 struct SubWriteCommitted
: public Context
{
826 eversion_t last_complete
;
827 const ZTracer::Trace trace
;
833 eversion_t last_complete
,
834 const ZTracer::Trace
&trace
)
835 : pg(pg
), msg(msg
), tid(tid
),
836 version(version
), last_complete(last_complete
), trace(trace
) {}
837 void finish(int) override
{
839 msg
->mark_event("sub_op_committed");
840 pg
->sub_write_committed(tid
, version
, last_complete
, trace
);
843 void ECBackend::sub_write_committed(
844 ceph_tid_t tid
, eversion_t version
, eversion_t last_complete
,
845 const ZTracer::Trace
&trace
) {
846 if (get_parent()->pgb_is_primary()) {
847 ECSubWriteReply reply
;
849 reply
.last_complete
= last_complete
;
850 reply
.committed
= true;
851 reply
.from
= get_parent()->whoami_shard();
852 handle_sub_write_reply(
853 get_parent()->whoami_shard(),
856 get_parent()->update_last_complete_ondisk(last_complete
);
857 MOSDECSubOpWriteReply
*r
= new MOSDECSubOpWriteReply
;
858 r
->pgid
= get_parent()->primary_spg_t();
859 r
->map_epoch
= get_parent()->get_epoch();
860 r
->min_epoch
= get_parent()->get_interval_start_epoch();
862 r
->op
.last_complete
= last_complete
;
863 r
->op
.committed
= true;
864 r
->op
.from
= get_parent()->whoami_shard();
865 r
->set_priority(CEPH_MSG_PRIO_HIGH
);
867 r
->trace
.event("sending sub op commit");
868 get_parent()->send_message_osd_cluster(
869 get_parent()->primary_shard().osd
, r
, get_parent()->get_epoch());
873 struct SubWriteApplied
: public Context
{
878 const ZTracer::Trace trace
;
884 const ZTracer::Trace
&trace
)
885 : pg(pg
), msg(msg
), tid(tid
), version(version
), trace(trace
) {}
886 void finish(int) override
{
888 msg
->mark_event("sub_op_applied");
889 pg
->sub_write_applied(tid
, version
, trace
);
892 void ECBackend::sub_write_applied(
893 ceph_tid_t tid
, eversion_t version
,
894 const ZTracer::Trace
&trace
) {
895 parent
->op_applied(version
);
896 if (get_parent()->pgb_is_primary()) {
897 ECSubWriteReply reply
;
898 reply
.from
= get_parent()->whoami_shard();
900 reply
.applied
= true;
901 handle_sub_write_reply(
902 get_parent()->whoami_shard(),
905 MOSDECSubOpWriteReply
*r
= new MOSDECSubOpWriteReply
;
906 r
->pgid
= get_parent()->primary_spg_t();
907 r
->map_epoch
= get_parent()->get_epoch();
908 r
->min_epoch
= get_parent()->get_interval_start_epoch();
909 r
->op
.from
= get_parent()->whoami_shard();
911 r
->op
.applied
= true;
912 r
->set_priority(CEPH_MSG_PRIO_HIGH
);
914 r
->trace
.event("sending sub op apply");
915 get_parent()->send_message_osd_cluster(
916 get_parent()->primary_shard().osd
, r
, get_parent()->get_epoch());
920 void ECBackend::handle_sub_write(
924 const ZTracer::Trace
&trace
,
925 Context
*on_local_applied_sync
)
929 trace
.event("handle_sub_write");
930 assert(!get_parent()->get_log().get_missing().is_missing(op
.soid
));
931 if (!get_parent()->pgb_is_primary())
932 get_parent()->update_stats(op
.stats
);
933 ObjectStore::Transaction localt
;
934 if (!op
.temp_added
.empty()) {
935 add_temp_objs(op
.temp_added
);
938 for (set
<hobject_t
>::iterator i
= op
.temp_removed
.begin();
939 i
!= op
.temp_removed
.end();
941 dout(10) << __func__
<< ": removing object " << *i
942 << " since we won't get the transaction" << dendl
;
948 get_parent()->whoami_shard().shard
));
951 clear_temp_objs(op
.temp_removed
);
952 get_parent()->log_operation(
954 op
.updated_hit_set_history
,
960 PrimaryLogPG
*_rPG
= dynamic_cast<PrimaryLogPG
*>(get_parent());
961 if (_rPG
&& !_rPG
->is_undersized() &&
962 (unsigned)get_parent()->whoami_shard().shard
>= ec_impl
->get_data_chunk_count())
963 op
.t
.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
965 if (on_local_applied_sync
) {
966 dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync
<< dendl
;
967 localt
.register_on_applied_sync(on_local_applied_sync
);
969 localt
.register_on_commit(
970 get_parent()->bless_context(
971 new SubWriteCommitted(
974 get_parent()->get_info().last_complete
, trace
)));
975 localt
.register_on_applied(
976 get_parent()->bless_context(
977 new SubWriteApplied(this, msg
, op
.tid
, op
.at_version
, trace
)));
978 vector
<ObjectStore::Transaction
> tls
;
980 tls
.push_back(std::move(op
.t
));
981 tls
.push_back(std::move(localt
));
982 get_parent()->queue_transactions(tls
, msg
);
985 void ECBackend::handle_sub_read(
988 ECSubReadReply
*reply
,
989 const ZTracer::Trace
&trace
)
991 trace
.event("handle sub read");
992 shard_id_t shard
= get_parent()->whoami_shard().shard
;
993 for(auto i
= op
.to_read
.begin();
994 i
!= op
.to_read
.end();
997 ECUtil::HashInfoRef hinfo
;
998 if (!get_parent()->get_pool().allows_ecoverwrites()) {
999 hinfo
= get_hash_info(i
->first
);
1002 get_parent()->clog_error() << "Corruption detected: object " << i
->first
1003 << " is missing hash_info";
1004 dout(5) << __func__
<< ": No hinfo for " << i
->first
<< dendl
;
1008 for (auto j
= i
->second
.begin(); j
!= i
->second
.end(); ++j
) {
1012 ghobject_t(i
->first
, ghobject_t::NO_GEN
, shard
),
1017 get_parent()->clog_error() << "Error " << r
1018 << " reading object "
1020 dout(5) << __func__
<< ": Error " << r
1021 << " reading " << i
->first
<< dendl
;
1024 dout(20) << __func__
<< " read request=" << j
->get
<1>() << " r=" << r
<< " len=" << bl
.length() << dendl
;
1025 reply
->buffers_read
[i
->first
].push_back(
1032 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1033 // This shows that we still need deep scrub because large enough files
1034 // are read in sections, so the digest check here won't be done here.
1035 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1036 // the state of our chunk in case other chunks could substitute.
1037 assert(hinfo
->has_chunk_hash());
1038 if ((bl
.length() == hinfo
->get_total_chunk_size()) &&
1039 (j
->get
<0>() == 0)) {
1040 dout(20) << __func__
<< ": Checking hash of " << i
->first
<< dendl
;
1043 if (h
.digest() != hinfo
->get_chunk_hash(shard
)) {
1044 get_parent()->clog_error() << "Bad hash for " << i
->first
<< " digest 0x"
1045 << hex
<< h
.digest() << " expected 0x" << hinfo
->get_chunk_hash(shard
) << dec
;
1046 dout(5) << __func__
<< ": Bad hash for " << i
->first
<< " digest 0x"
1047 << hex
<< h
.digest() << " expected 0x" << hinfo
->get_chunk_hash(shard
) << dec
<< dendl
;
1056 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1057 // the state of our chunk in case other chunks could substitute.
1058 reply
->buffers_read
.erase(i
->first
);
1059 reply
->errors
[i
->first
] = r
;
1061 for (set
<hobject_t
>::iterator i
= op
.attrs_to_read
.begin();
1062 i
!= op
.attrs_to_read
.end();
1064 dout(10) << __func__
<< ": fulfilling attr request on "
1066 if (reply
->errors
.count(*i
))
1068 int r
= store
->getattrs(
1071 *i
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
1072 reply
->attrs_read
[*i
]);
1074 reply
->buffers_read
.erase(*i
);
1075 reply
->errors
[*i
] = r
;
1078 reply
->from
= get_parent()->whoami_shard();
1079 reply
->tid
= op
.tid
;
1082 void ECBackend::handle_sub_write_reply(
1084 const ECSubWriteReply
&op
,
1085 const ZTracer::Trace
&trace
)
1087 map
<ceph_tid_t
, Op
>::iterator i
= tid_to_op_map
.find(op
.tid
);
1088 assert(i
!= tid_to_op_map
.end());
1090 trace
.event("sub write committed");
1091 assert(i
->second
.pending_commit
.count(from
));
1092 i
->second
.pending_commit
.erase(from
);
1093 if (from
!= get_parent()->whoami_shard()) {
1094 get_parent()->update_peer_last_complete_ondisk(from
, op
.last_complete
);
1098 trace
.event("sub write applied");
1099 assert(i
->second
.pending_apply
.count(from
));
1100 i
->second
.pending_apply
.erase(from
);
1103 if (i
->second
.pending_apply
.empty() && i
->second
.on_all_applied
) {
1104 dout(10) << __func__
<< " Calling on_all_applied on " << i
->second
<< dendl
;
1105 i
->second
.on_all_applied
->complete(0);
1106 i
->second
.on_all_applied
= 0;
1107 i
->second
.trace
.event("ec write all applied");
1109 if (i
->second
.pending_commit
.empty() && i
->second
.on_all_commit
) {
1110 dout(10) << __func__
<< " Calling on_all_commit on " << i
->second
<< dendl
;
1111 i
->second
.on_all_commit
->complete(0);
1112 i
->second
.on_all_commit
= 0;
1113 i
->second
.trace
.event("ec write all committed");
1118 void ECBackend::handle_sub_read_reply(
1121 RecoveryMessages
*m
,
1122 const ZTracer::Trace
&trace
)
1124 trace
.event("ec sub read reply");
1125 dout(10) << __func__
<< ": reply " << op
<< dendl
;
1126 map
<ceph_tid_t
, ReadOp
>::iterator iter
= tid_to_read_map
.find(op
.tid
);
1127 if (iter
== tid_to_read_map
.end()) {
1129 dout(20) << __func__
<< ": dropped " << op
<< dendl
;
1132 ReadOp
&rop
= iter
->second
;
1133 for (auto i
= op
.buffers_read
.begin();
1134 i
!= op
.buffers_read
.end();
1136 assert(!op
.errors
.count(i
->first
)); // If attribute error we better not have sent a buffer
1137 if (!rop
.to_read
.count(i
->first
)) {
1138 // We canceled this read! @see filter_read_op
1139 dout(20) << __func__
<< " to_read skipping" << dendl
;
1142 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter
=
1143 rop
.to_read
.find(i
->first
)->second
.to_read
.begin();
1146 uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > >::iterator riter
=
1147 rop
.complete
[i
->first
].returned
.begin();
1148 for (list
<pair
<uint64_t, bufferlist
> >::iterator j
= i
->second
.begin();
1149 j
!= i
->second
.end();
1150 ++j
, ++req_iter
, ++riter
) {
1151 assert(req_iter
!= rop
.to_read
.find(i
->first
)->second
.to_read
.end());
1152 assert(riter
!= rop
.complete
[i
->first
].returned
.end());
1153 pair
<uint64_t, uint64_t> adjusted
=
1154 sinfo
.aligned_offset_len_to_chunk(
1155 make_pair(req_iter
->get
<0>(), req_iter
->get
<1>()));
1156 assert(adjusted
.first
== j
->first
);
1157 riter
->get
<2>()[from
].claim(j
->second
);
1160 for (auto i
= op
.attrs_read
.begin();
1161 i
!= op
.attrs_read
.end();
1163 assert(!op
.errors
.count(i
->first
)); // if read error better not have sent an attribute
1164 if (!rop
.to_read
.count(i
->first
)) {
1165 // We canceled this read! @see filter_read_op
1166 dout(20) << __func__
<< " to_read skipping" << dendl
;
1169 rop
.complete
[i
->first
].attrs
= map
<string
, bufferlist
>();
1170 (*(rop
.complete
[i
->first
].attrs
)).swap(i
->second
);
1172 for (auto i
= op
.errors
.begin();
1173 i
!= op
.errors
.end();
1175 rop
.complete
[i
->first
].errors
.insert(
1179 dout(20) << __func__
<< " shard=" << from
<< " error=" << i
->second
<< dendl
;
1182 map
<pg_shard_t
, set
<ceph_tid_t
> >::iterator siter
=
1183 shard_to_read_map
.find(from
);
1184 assert(siter
!= shard_to_read_map
.end());
1185 assert(siter
->second
.count(op
.tid
));
1186 siter
->second
.erase(op
.tid
);
1188 assert(rop
.in_progress
.count(from
));
1189 rop
.in_progress
.erase(from
);
1190 unsigned is_complete
= 0;
1191 // For redundant reads check for completion as each shard comes in,
1192 // or in a non-recovery read check for completion once all the shards read.
1193 if (rop
.do_redundant_reads
|| rop
.in_progress
.empty()) {
1194 for (map
<hobject_t
, read_result_t
>::const_iterator iter
=
1195 rop
.complete
.begin();
1196 iter
!= rop
.complete
.end();
1199 for (map
<pg_shard_t
, bufferlist
>::const_iterator j
=
1200 iter
->second
.returned
.front().get
<2>().begin();
1201 j
!= iter
->second
.returned
.front().get
<2>().end();
1203 have
.insert(j
->first
.shard
);
1204 dout(20) << __func__
<< " have shard=" << j
->first
.shard
<< dendl
;
1206 set
<int> want_to_read
, dummy_minimum
;
1207 get_want_to_read_shards(&want_to_read
);
1209 if ((err
= ec_impl
->minimum_to_decode(want_to_read
, have
, &dummy_minimum
)) < 0) {
1210 dout(20) << __func__
<< " minimum_to_decode failed" << dendl
;
1211 if (rop
.in_progress
.empty()) {
1212 // If we don't have enough copies and we haven't sent reads for all shards
1213 // we can send the rest of the reads, if any.
1214 if (!rop
.do_redundant_reads
) {
1215 int r
= send_all_remaining_reads(iter
->first
, rop
);
1217 // We added to in_progress and not incrementing is_complete
1220 // Couldn't read any additional shards so handle as completed with errors
1222 // We don't want to confuse clients / RBD with objectstore error
1223 // values in particular ENOENT. We may have different error returns
1224 // from different shards, so we'll return minimum_to_decode() error
1225 // (usually EIO) to reader. It is likely an error here is due to a
1227 rop
.complete
[iter
->first
].r
= err
;
1231 assert(rop
.complete
[iter
->first
].r
== 0);
1232 if (!rop
.complete
[iter
->first
].errors
.empty()) {
1233 if (cct
->_conf
->osd_read_ec_check_for_errors
) {
1234 dout(10) << __func__
<< ": Not ignoring errors, use one shard err=" << err
<< dendl
;
1235 err
= rop
.complete
[iter
->first
].errors
.begin()->second
;
1236 rop
.complete
[iter
->first
].r
= err
;
1238 get_parent()->clog_warn() << "Error(s) ignored for "
1239 << iter
->first
<< " enough copies available";
1240 dout(10) << __func__
<< " Error(s) ignored for " << iter
->first
1241 << " enough copies available" << dendl
;
1242 rop
.complete
[iter
->first
].errors
.clear();
1249 if (rop
.in_progress
.empty() || is_complete
== rop
.complete
.size()) {
1250 dout(20) << __func__
<< " Complete: " << rop
<< dendl
;
1251 rop
.trace
.event("ec read complete");
1252 complete_read_op(rop
, m
);
1254 dout(10) << __func__
<< " readop not complete: " << rop
<< dendl
;
1258 void ECBackend::complete_read_op(ReadOp
&rop
, RecoveryMessages
*m
)
1260 map
<hobject_t
, read_request_t
>::iterator reqiter
=
1261 rop
.to_read
.begin();
1262 map
<hobject_t
, read_result_t
>::iterator resiter
=
1263 rop
.complete
.begin();
1264 assert(rop
.to_read
.size() == rop
.complete
.size());
1265 for (; reqiter
!= rop
.to_read
.end(); ++reqiter
, ++resiter
) {
1266 if (reqiter
->second
.cb
) {
1267 pair
<RecoveryMessages
*, read_result_t
&> arg(
1268 m
, resiter
->second
);
1269 reqiter
->second
.cb
->complete(arg
);
1270 reqiter
->second
.cb
= NULL
;
1273 tid_to_read_map
.erase(rop
.tid
);
1276 struct FinishReadOp
: public GenContext
<ThreadPool::TPHandle
&> {
1279 FinishReadOp(ECBackend
*ec
, ceph_tid_t tid
) : ec(ec
), tid(tid
) {}
1280 void finish(ThreadPool::TPHandle
&handle
) override
{
1281 auto ropiter
= ec
->tid_to_read_map
.find(tid
);
1282 assert(ropiter
!= ec
->tid_to_read_map
.end());
1283 int priority
= ropiter
->second
.priority
;
1284 RecoveryMessages rm
;
1285 ec
->complete_read_op(ropiter
->second
, &rm
);
1286 ec
->dispatch_recovery_messages(rm
, priority
);
1290 void ECBackend::filter_read_op(
1291 const OSDMapRef
& osdmap
,
1294 set
<hobject_t
> to_cancel
;
1295 for (map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= op
.source_to_obj
.begin();
1296 i
!= op
.source_to_obj
.end();
1298 if (osdmap
->is_down(i
->first
.osd
)) {
1299 to_cancel
.insert(i
->second
.begin(), i
->second
.end());
1300 op
.in_progress
.erase(i
->first
);
1305 if (to_cancel
.empty())
1308 for (map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= op
.source_to_obj
.begin();
1309 i
!= op
.source_to_obj
.end();
1311 for (set
<hobject_t
>::iterator j
= i
->second
.begin();
1312 j
!= i
->second
.end();
1314 if (to_cancel
.count(*j
))
1315 i
->second
.erase(j
++);
1319 if (i
->second
.empty()) {
1320 op
.source_to_obj
.erase(i
++);
1322 assert(!osdmap
->is_down(i
->first
.osd
));
1327 for (set
<hobject_t
>::iterator i
= to_cancel
.begin();
1328 i
!= to_cancel
.end();
1330 get_parent()->cancel_pull(*i
);
1332 assert(op
.to_read
.count(*i
));
1333 read_request_t
&req
= op
.to_read
.find(*i
)->second
;
1334 dout(10) << __func__
<< ": canceling " << req
1335 << " for obj " << *i
<< dendl
;
1340 op
.to_read
.erase(*i
);
1341 op
.complete
.erase(*i
);
1342 recovery_ops
.erase(*i
);
1345 if (op
.in_progress
.empty()) {
1346 get_parent()->schedule_recovery_work(
1347 get_parent()->bless_gencontext(
1348 new FinishReadOp(this, op
.tid
)));
1352 void ECBackend::check_recovery_sources(const OSDMapRef
& osdmap
)
1354 set
<ceph_tid_t
> tids_to_filter
;
1355 for (map
<pg_shard_t
, set
<ceph_tid_t
> >::iterator
1356 i
= shard_to_read_map
.begin();
1357 i
!= shard_to_read_map
.end();
1359 if (osdmap
->is_down(i
->first
.osd
)) {
1360 tids_to_filter
.insert(i
->second
.begin(), i
->second
.end());
1361 shard_to_read_map
.erase(i
++);
1366 for (set
<ceph_tid_t
>::iterator i
= tids_to_filter
.begin();
1367 i
!= tids_to_filter
.end();
1369 map
<ceph_tid_t
, ReadOp
>::iterator j
= tid_to_read_map
.find(*i
);
1370 assert(j
!= tid_to_read_map
.end());
1371 filter_read_op(osdmap
, j
->second
);
1375 void ECBackend::on_change()
1377 dout(10) << __func__
<< dendl
;
1379 completed_to
= eversion_t();
1380 committed_to
= eversion_t();
1381 pipeline_state
.clear();
1382 waiting_reads
.clear();
1383 waiting_state
.clear();
1384 waiting_commit
.clear();
1385 for (auto &&op
: tid_to_op_map
) {
1386 cache
.release_write_pin(op
.second
.pin
);
1388 tid_to_op_map
.clear();
1390 for (map
<ceph_tid_t
, ReadOp
>::iterator i
= tid_to_read_map
.begin();
1391 i
!= tid_to_read_map
.end();
1393 dout(10) << __func__
<< ": cancelling " << i
->second
<< dendl
;
1394 for (map
<hobject_t
, read_request_t
>::iterator j
=
1395 i
->second
.to_read
.begin();
1396 j
!= i
->second
.to_read
.end();
1398 delete j
->second
.cb
;
1402 tid_to_read_map
.clear();
1403 in_progress_client_reads
.clear();
1404 shard_to_read_map
.clear();
1405 clear_recovery_state();
1408 void ECBackend::clear_recovery_state()
1410 recovery_ops
.clear();
1413 void ECBackend::on_flushed()
1417 void ECBackend::dump_recovery_info(Formatter
*f
) const
1419 f
->open_array_section("recovery_ops");
1420 for (map
<hobject_t
, RecoveryOp
>::const_iterator i
= recovery_ops
.begin();
1421 i
!= recovery_ops
.end();
1423 f
->open_object_section("op");
1428 f
->open_array_section("read_ops");
1429 for (map
<ceph_tid_t
, ReadOp
>::const_iterator i
= tid_to_read_map
.begin();
1430 i
!= tid_to_read_map
.end();
1432 f
->open_object_section("read_op");
1439 void ECBackend::submit_transaction(
1440 const hobject_t
&hoid
,
1441 const object_stat_sum_t
&delta_stats
,
1442 const eversion_t
&at_version
,
1443 PGTransactionUPtr
&&t
,
1444 const eversion_t
&trim_to
,
1445 const eversion_t
&roll_forward_to
,
1446 const vector
<pg_log_entry_t
> &log_entries
,
1447 boost::optional
<pg_hit_set_history_t
> &hset_history
,
1448 Context
*on_local_applied_sync
,
1449 Context
*on_all_applied
,
1450 Context
*on_all_commit
,
1453 OpRequestRef client_op
1456 assert(!tid_to_op_map
.count(tid
));
1457 Op
*op
= &(tid_to_op_map
[tid
]);
1459 op
->delta_stats
= delta_stats
;
1460 op
->version
= at_version
;
1461 op
->trim_to
= trim_to
;
1462 op
->roll_forward_to
= MAX(roll_forward_to
, committed_to
);
1463 op
->log_entries
= log_entries
;
1464 std::swap(op
->updated_hit_set_history
, hset_history
);
1465 op
->on_local_applied_sync
= on_local_applied_sync
;
1466 op
->on_all_applied
= on_all_applied
;
1467 op
->on_all_commit
= on_all_commit
;
1470 op
->client_op
= client_op
;
1472 op
->trace
= client_op
->pg_trace
;
1474 dout(10) << __func__
<< ": op " << *op
<< " starting" << dendl
;
1475 start_rmw(op
, std::move(t
));
1476 dout(10) << "onreadable_sync: " << op
->on_local_applied_sync
<< dendl
;
1479 void ECBackend::call_write_ordered(std::function
<void(void)> &&cb
) {
1480 if (!waiting_state
.empty()) {
1481 waiting_state
.back().on_write
.emplace_back(std::move(cb
));
1482 } else if (!waiting_reads
.empty()) {
1483 waiting_reads
.back().on_write
.emplace_back(std::move(cb
));
1485 // Nothing earlier in the pipeline, just call it
1490 void ECBackend::get_all_avail_shards(
1491 const hobject_t
&hoid
,
1493 map
<shard_id_t
, pg_shard_t
> &shards
,
1496 for (set
<pg_shard_t
>::const_iterator i
=
1497 get_parent()->get_acting_shards().begin();
1498 i
!= get_parent()->get_acting_shards().end();
1500 dout(10) << __func__
<< ": checking acting " << *i
<< dendl
;
1501 const pg_missing_t
&missing
= get_parent()->get_shard_missing(*i
);
1502 if (!missing
.is_missing(hoid
)) {
1503 assert(!have
.count(i
->shard
));
1504 have
.insert(i
->shard
);
1505 assert(!shards
.count(i
->shard
));
1506 shards
.insert(make_pair(i
->shard
, *i
));
1511 for (set
<pg_shard_t
>::const_iterator i
=
1512 get_parent()->get_backfill_shards().begin();
1513 i
!= get_parent()->get_backfill_shards().end();
1515 if (have
.count(i
->shard
)) {
1516 assert(shards
.count(i
->shard
));
1519 dout(10) << __func__
<< ": checking backfill " << *i
<< dendl
;
1520 assert(!shards
.count(i
->shard
));
1521 const pg_info_t
&info
= get_parent()->get_shard_info(*i
);
1522 const pg_missing_t
&missing
= get_parent()->get_shard_missing(*i
);
1523 if (hoid
< info
.last_backfill
&&
1524 !missing
.is_missing(hoid
)) {
1525 have
.insert(i
->shard
);
1526 shards
.insert(make_pair(i
->shard
, *i
));
1530 map
<hobject_t
, set
<pg_shard_t
>>::const_iterator miter
=
1531 get_parent()->get_missing_loc_shards().find(hoid
);
1532 if (miter
!= get_parent()->get_missing_loc_shards().end()) {
1533 for (set
<pg_shard_t
>::iterator i
= miter
->second
.begin();
1534 i
!= miter
->second
.end();
1536 dout(10) << __func__
<< ": checking missing_loc " << *i
<< dendl
;
1537 auto m
= get_parent()->maybe_get_shard_missing(*i
);
1539 assert(!(*m
).is_missing(hoid
));
1541 have
.insert(i
->shard
);
1542 shards
.insert(make_pair(i
->shard
, *i
));
1548 int ECBackend::get_min_avail_to_read_shards(
1549 const hobject_t
&hoid
,
1550 const set
<int> &want
,
1552 bool do_redundant_reads
,
1553 set
<pg_shard_t
> *to_read
)
1555 // Make sure we don't do redundant reads for recovery
1556 assert(!for_recovery
|| !do_redundant_reads
);
1559 map
<shard_id_t
, pg_shard_t
> shards
;
1561 get_all_avail_shards(hoid
, have
, shards
, for_recovery
);
1564 int r
= ec_impl
->minimum_to_decode(want
, have
, &need
);
1568 if (do_redundant_reads
) {
1575 for (set
<int>::iterator i
= need
.begin();
1578 assert(shards
.count(shard_id_t(*i
)));
1579 to_read
->insert(shards
[shard_id_t(*i
)]);
1584 int ECBackend::get_remaining_shards(
1585 const hobject_t
&hoid
,
1586 const set
<int> &avail
,
1587 set
<pg_shard_t
> *to_read
,
1593 map
<shard_id_t
, pg_shard_t
> shards
;
1595 get_all_avail_shards(hoid
, have
, shards
, for_recovery
);
1597 for (set
<int>::iterator i
= have
.begin();
1600 assert(shards
.count(shard_id_t(*i
)));
1601 if (avail
.find(*i
) == avail
.end())
1602 to_read
->insert(shards
[shard_id_t(*i
)]);
1607 void ECBackend::start_read_op(
1609 map
<hobject_t
, read_request_t
> &to_read
,
1611 bool do_redundant_reads
,
1614 ceph_tid_t tid
= get_parent()->get_tid();
1615 assert(!tid_to_read_map
.count(tid
));
1616 auto &op
= tid_to_read_map
.emplace(
1624 std::move(to_read
))).first
->second
;
1625 dout(10) << __func__
<< ": starting " << op
<< dendl
;
1627 op
.trace
= _op
->pg_trace
;
1628 op
.trace
.event("start ec read");
1633 void ECBackend::do_read_op(ReadOp
&op
)
1635 int priority
= op
.priority
;
1636 ceph_tid_t tid
= op
.tid
;
1638 dout(10) << __func__
<< ": starting read " << op
<< dendl
;
1640 map
<pg_shard_t
, ECSubRead
> messages
;
1641 for (map
<hobject_t
, read_request_t
>::iterator i
= op
.to_read
.begin();
1642 i
!= op
.to_read
.end();
1644 bool need_attrs
= i
->second
.want_attrs
;
1645 for (set
<pg_shard_t
>::const_iterator j
= i
->second
.need
.begin();
1646 j
!= i
->second
.need
.end();
1649 messages
[*j
].attrs_to_read
.insert(i
->first
);
1652 op
.obj_to_source
[i
->first
].insert(*j
);
1653 op
.source_to_obj
[*j
].insert(i
->first
);
1655 for (list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >::const_iterator j
=
1656 i
->second
.to_read
.begin();
1657 j
!= i
->second
.to_read
.end();
1659 pair
<uint64_t, uint64_t> chunk_off_len
=
1660 sinfo
.aligned_offset_len_to_chunk(make_pair(j
->get
<0>(), j
->get
<1>()));
1661 for (set
<pg_shard_t
>::const_iterator k
= i
->second
.need
.begin();
1662 k
!= i
->second
.need
.end();
1664 messages
[*k
].to_read
[i
->first
].push_back(
1666 chunk_off_len
.first
,
1667 chunk_off_len
.second
,
1670 assert(!need_attrs
);
1674 for (map
<pg_shard_t
, ECSubRead
>::iterator i
= messages
.begin();
1675 i
!= messages
.end();
1677 op
.in_progress
.insert(i
->first
);
1678 shard_to_read_map
[i
->first
].insert(op
.tid
);
1679 i
->second
.tid
= tid
;
1680 MOSDECSubOpRead
*msg
= new MOSDECSubOpRead
;
1681 msg
->set_priority(priority
);
1683 get_parent()->whoami_spg_t().pgid
,
1685 msg
->map_epoch
= get_parent()->get_epoch();
1686 msg
->min_epoch
= get_parent()->get_interval_start_epoch();
1687 msg
->op
= i
->second
;
1688 msg
->op
.from
= get_parent()->whoami_shard();
1691 // initialize a child span for this shard
1692 msg
->trace
.init("ec sub read", nullptr, &op
.trace
);
1693 msg
->trace
.keyval("shard", i
->first
.shard
.id
);
1695 get_parent()->send_message_osd_cluster(
1698 get_parent()->get_epoch());
1700 dout(10) << __func__
<< ": started " << op
<< dendl
;
1703 ECUtil::HashInfoRef
ECBackend::get_hash_info(
1704 const hobject_t
&hoid
, bool checks
, const map
<string
,bufferptr
> *attrs
)
1706 dout(10) << __func__
<< ": Getting attr on " << hoid
<< dendl
;
1707 ECUtil::HashInfoRef ref
= unstable_hashinfo_registry
.lookup(hoid
);
1709 dout(10) << __func__
<< ": not in cache " << hoid
<< dendl
;
1711 int r
= store
->stat(
1713 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
1715 ECUtil::HashInfo
hinfo(ec_impl
->get_chunk_count());
1716 // XXX: What does it mean if there is no object on disk?
1718 dout(10) << __func__
<< ": found on disk, size " << st
.st_size
<< dendl
;
1721 map
<string
, bufferptr
>::const_iterator k
= attrs
->find(ECUtil::get_hinfo_key());
1722 if (k
== attrs
->end()) {
1723 dout(5) << __func__
<< " " << hoid
<< " missing hinfo attr" << dendl
;
1725 bl
.push_back(k
->second
);
1730 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
1731 ECUtil::get_hinfo_key(),
1734 dout(5) << __func__
<< ": getattr failed: " << cpp_strerror(r
) << dendl
;
1735 bl
.clear(); // just in case
1738 if (bl
.length() > 0) {
1739 bufferlist::iterator bp
= bl
.begin();
1740 ::decode(hinfo
, bp
);
1741 if (checks
&& hinfo
.get_total_chunk_size() != (uint64_t)st
.st_size
) {
1742 dout(0) << __func__
<< ": Mismatch of total_chunk_size "
1743 << hinfo
.get_total_chunk_size() << dendl
;
1744 return ECUtil::HashInfoRef();
1746 } else if (st
.st_size
> 0) { // If empty object and no hinfo, create it
1747 return ECUtil::HashInfoRef();
1750 ref
= unstable_hashinfo_registry
.lookup_or_create(hoid
, hinfo
);
1755 void ECBackend::start_rmw(Op
*op
, PGTransactionUPtr
&&t
)
1759 op
->plan
= ECTransaction::get_write_plan(
1762 [&](const hobject_t
&i
) {
1763 ECUtil::HashInfoRef ref
= get_hash_info(i
, false);
1765 derr
<< __func__
<< ": get_hash_info(" << i
<< ")"
1766 << " returned a null pointer and there is no "
1767 << " way to recover from such an error in this "
1768 << " context" << dendl
;
1773 get_parent()->get_dpp());
1775 dout(10) << __func__
<< ": " << *op
<< dendl
;
1777 waiting_state
.push_back(*op
);
1781 bool ECBackend::try_state_to_reads()
1783 if (waiting_state
.empty())
1786 Op
*op
= &(waiting_state
.front());
1787 if (op
->requires_rmw() && pipeline_state
.cache_invalid()) {
1788 assert(get_parent()->get_pool().allows_ecoverwrites());
1789 dout(20) << __func__
<< ": blocking " << *op
1790 << " because it requires an rmw and the cache is invalid "
1796 if (op
->invalidates_cache()) {
1797 dout(20) << __func__
<< ": invalidating cache after this op"
1799 pipeline_state
.invalidate();
1800 op
->using_cache
= false;
1802 op
->using_cache
= pipeline_state
.caching_enabled();
1805 waiting_state
.pop_front();
1806 waiting_reads
.push_back(*op
);
1808 if (op
->using_cache
) {
1809 cache
.open_write_pin(op
->pin
);
1812 for (auto &&hpair
: op
->plan
.will_write
) {
1813 auto to_read_plan_iter
= op
->plan
.to_read
.find(hpair
.first
);
1814 const extent_set
&to_read_plan
=
1815 to_read_plan_iter
== op
->plan
.to_read
.end() ?
1817 to_read_plan_iter
->second
;
1819 extent_set remote_read
= cache
.reserve_extents_for_rmw(
1825 extent_set pending_read
= to_read_plan
;
1826 pending_read
.subtract(remote_read
);
1828 if (!remote_read
.empty()) {
1829 op
->remote_read
[hpair
.first
] = std::move(remote_read
);
1831 if (!pending_read
.empty()) {
1832 op
->pending_read
[hpair
.first
] = std::move(pending_read
);
1836 op
->remote_read
= op
->plan
.to_read
;
1839 dout(10) << __func__
<< ": " << *op
<< dendl
;
1841 if (!op
->remote_read
.empty()) {
1842 assert(get_parent()->get_pool().allows_ecoverwrites());
1843 objects_read_async_no_cache(
1845 [this, op
](map
<hobject_t
,pair
<int, extent_map
> > &&results
) {
1846 for (auto &&i
: results
) {
1847 op
->remote_read_result
.emplace(i
.first
, i
.second
.second
);
1856 bool ECBackend::try_reads_to_commit()
1858 if (waiting_reads
.empty())
1860 Op
*op
= &(waiting_reads
.front());
1861 if (op
->read_in_progress())
1863 waiting_reads
.pop_front();
1864 waiting_commit
.push_back(*op
);
1866 dout(10) << __func__
<< ": starting commit on " << *op
<< dendl
;
1867 dout(20) << __func__
<< ": " << cache
<< dendl
;
1869 get_parent()->apply_stats(
1873 if (op
->using_cache
) {
1874 for (auto &&hpair
: op
->pending_read
) {
1875 op
->remote_read_result
[hpair
.first
].insert(
1876 cache
.get_remaining_extents_for_rmw(
1881 op
->pending_read
.clear();
1883 assert(op
->pending_read
.empty());
1886 map
<shard_id_t
, ObjectStore::Transaction
> trans
;
1887 for (set
<pg_shard_t
>::const_iterator i
=
1888 get_parent()->get_actingbackfill_shards().begin();
1889 i
!= get_parent()->get_actingbackfill_shards().end();
1894 op
->trace
.event("start ec write");
1896 map
<hobject_t
,extent_map
> written
;
1898 ECTransaction::generate_transactions(
1901 get_parent()->get_info().pgid
.pgid
,
1902 (get_osdmap()->require_osd_release
< CEPH_RELEASE_KRAKEN
),
1904 op
->remote_read_result
,
1909 &(op
->temp_cleared
),
1910 get_parent()->get_dpp());
1913 dout(20) << __func__
<< ": " << cache
<< dendl
;
1914 dout(20) << __func__
<< ": written: " << written
<< dendl
;
1915 dout(20) << __func__
<< ": op: " << *op
<< dendl
;
1917 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1918 for (auto &&i
: op
->log_entries
) {
1919 if (i
.requires_kraken()) {
1920 derr
<< __func__
<< ": log entry " << i
<< " requires kraken"
1921 << " but overwrites are not enabled!" << dendl
;
1927 map
<hobject_t
,extent_set
> written_set
;
1928 for (auto &&i
: written
) {
1929 written_set
[i
.first
] = i
.second
.get_interval_set();
1931 dout(20) << __func__
<< ": written_set: " << written_set
<< dendl
;
1932 assert(written_set
== op
->plan
.will_write
);
1934 if (op
->using_cache
) {
1935 for (auto &&hpair
: written
) {
1936 dout(20) << __func__
<< ": " << hpair
<< dendl
;
1937 cache
.present_rmw_update(hpair
.first
, op
->pin
, hpair
.second
);
1940 op
->remote_read
.clear();
1941 op
->remote_read_result
.clear();
1943 dout(10) << "onreadable_sync: " << op
->on_local_applied_sync
<< dendl
;
1944 ObjectStore::Transaction empty
;
1945 bool should_write_local
= false;
1946 ECSubWrite local_write_op
;
1947 for (set
<pg_shard_t
>::const_iterator i
=
1948 get_parent()->get_actingbackfill_shards().begin();
1949 i
!= get_parent()->get_actingbackfill_shards().end();
1951 op
->pending_apply
.insert(*i
);
1952 op
->pending_commit
.insert(*i
);
1953 map
<shard_id_t
, ObjectStore::Transaction
>::iterator iter
=
1954 trans
.find(i
->shard
);
1955 assert(iter
!= trans
.end());
1956 bool should_send
= get_parent()->should_send_op(*i
, op
->hoid
);
1957 const pg_stat_t
&stats
=
1960 parent
->get_shard_info().find(*i
)->second
.stats
;
1963 get_parent()->whoami_shard(),
1968 should_send
? iter
->second
: empty
,
1971 op
->roll_forward_to
,
1973 op
->updated_hit_set_history
,
1978 ZTracer::Trace trace
;
1980 // initialize a child span for this shard
1981 trace
.init("ec sub write", nullptr, &op
->trace
);
1982 trace
.keyval("shard", i
->shard
.id
);
1985 if (*i
== get_parent()->whoami_shard()) {
1986 should_write_local
= true;
1987 local_write_op
.claim(sop
);
1989 MOSDECSubOpWrite
*r
= new MOSDECSubOpWrite(sop
);
1990 r
->pgid
= spg_t(get_parent()->primary_spg_t().pgid
, i
->shard
);
1991 r
->map_epoch
= get_parent()->get_epoch();
1992 r
->min_epoch
= get_parent()->get_interval_start_epoch();
1994 get_parent()->send_message_osd_cluster(
1995 i
->osd
, r
, get_parent()->get_epoch());
1998 if (should_write_local
) {
2000 get_parent()->whoami_shard(),
2004 op
->on_local_applied_sync
);
2005 op
->on_local_applied_sync
= 0;
2008 for (auto i
= op
->on_write
.begin();
2009 i
!= op
->on_write
.end();
2010 op
->on_write
.erase(i
++)) {
2017 bool ECBackend::try_finish_rmw()
2019 if (waiting_commit
.empty())
2021 Op
*op
= &(waiting_commit
.front());
2022 if (op
->write_in_progress())
2024 waiting_commit
.pop_front();
2026 dout(10) << __func__
<< ": " << *op
<< dendl
;
2027 dout(20) << __func__
<< ": " << cache
<< dendl
;
2029 if (op
->roll_forward_to
> completed_to
)
2030 completed_to
= op
->roll_forward_to
;
2031 if (op
->version
> committed_to
)
2032 committed_to
= op
->version
;
2034 if (get_osdmap()->require_osd_release
>= CEPH_RELEASE_KRAKEN
) {
2035 if (op
->version
> get_parent()->get_log().get_can_rollback_to() &&
2036 waiting_reads
.empty() &&
2037 waiting_commit
.empty()) {
2038 // submit a dummy transaction to kick the rollforward
2039 auto tid
= get_parent()->get_tid();
2040 Op
*nop
= &(tid_to_op_map
[tid
]);
2041 nop
->hoid
= op
->hoid
;
2042 nop
->trim_to
= op
->trim_to
;
2043 nop
->roll_forward_to
= op
->version
;
2045 nop
->reqid
= op
->reqid
;
2046 waiting_reads
.push_back(*nop
);
2050 if (op
->using_cache
) {
2051 cache
.release_write_pin(op
->pin
);
2053 tid_to_op_map
.erase(op
->tid
);
2055 if (waiting_reads
.empty() &&
2056 waiting_commit
.empty()) {
2057 pipeline_state
.clear();
2058 dout(20) << __func__
<< ": clearing pipeline_state "
2065 void ECBackend::check_ops()
2067 while (try_state_to_reads() ||
2068 try_reads_to_commit() ||
2072 int ECBackend::objects_read_sync(
2073 const hobject_t
&hoid
,
2082 void ECBackend::objects_read_async(
2083 const hobject_t
&hoid
,
2084 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2085 pair
<bufferlist
*, Context
*> > > &to_read
,
2086 Context
*on_complete
,
2089 map
<hobject_t
,std::list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > >
2094 for (list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2095 pair
<bufferlist
*, Context
*> > >::const_iterator i
=
2099 pair
<uint64_t, uint64_t> tmp
=
2100 sinfo
.offset_len_to_stripe_bounds(
2101 make_pair(i
->first
.get
<0>(), i
->first
.get
<1>()));
2104 esnew
.insert(tmp
.first
, tmp
.second
);
2106 flags
|= i
->first
.get
<2>();
2110 auto &offsets
= reads
[hoid
];
2111 for (auto j
= es
.begin();
2125 list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2126 pair
<bufferlist
*, Context
*> > > to_read
;
2127 unique_ptr
<Context
> on_complete
;
2128 cb(const cb
&) = delete;
2129 cb(cb
&&) = default;
2131 const hobject_t
&hoid
,
2132 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2133 pair
<bufferlist
*, Context
*> > > &to_read
,
2134 Context
*on_complete
)
2138 on_complete(on_complete
) {}
2139 void operator()(map
<hobject_t
,pair
<int, extent_map
> > &&results
) {
2140 auto dpp
= ec
->get_parent()->get_dpp();
2141 ldpp_dout(dpp
, 20) << "objects_read_async_cb: got: " << results
2143 ldpp_dout(dpp
, 20) << "objects_read_async_cb: cache: " << ec
->cache
2146 auto &got
= results
[hoid
];
2149 for (auto &&read
: to_read
) {
2150 if (got
.first
< 0) {
2151 if (read
.second
.second
) {
2152 read
.second
.second
->complete(got
.first
);
2157 assert(read
.second
.first
);
2158 uint64_t offset
= read
.first
.get
<0>();
2159 uint64_t length
= read
.first
.get
<1>();
2160 auto range
= got
.second
.get_containing_range(offset
, length
);
2161 assert(range
.first
!= range
.second
);
2162 assert(range
.first
.get_off() <= offset
);
2164 (offset
+ length
) <=
2165 (range
.first
.get_off() + range
.first
.get_len()));
2166 read
.second
.first
->substr_of(
2167 range
.first
.get_val(),
2168 offset
- range
.first
.get_off(),
2170 if (read
.second
.second
) {
2171 read
.second
.second
->complete(length
);
2172 read
.second
.second
= nullptr;
2178 on_complete
.release()->complete(r
);
2182 for (auto &&i
: to_read
) {
2183 delete i
.second
.second
;
2188 objects_read_and_reconstruct(
2191 make_gen_lambda_context
<
2192 map
<hobject_t
,pair
<int, extent_map
> > &&, cb
>(
2199 struct CallClientContexts
:
2200 public GenContext
<pair
<RecoveryMessages
*, ECBackend::read_result_t
& > &> {
2203 ECBackend::ClientAsyncReadStatus
*status
;
2204 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > to_read
;
2208 ECBackend::ClientAsyncReadStatus
*status
,
2209 const list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > &to_read
)
2210 : hoid(hoid
), ec(ec
), status(status
), to_read(to_read
) {}
2211 void finish(pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
) override
{
2212 ECBackend::read_result_t
&res
= in
.second
;
2216 assert(res
.returned
.size() == to_read
.size());
2218 assert(res
.errors
.empty());
2219 for (auto &&read
: to_read
) {
2220 pair
<uint64_t, uint64_t> adjusted
=
2221 ec
->sinfo
.offset_len_to_stripe_bounds(
2222 make_pair(read
.get
<0>(), read
.get
<1>()));
2223 assert(res
.returned
.front().get
<0>() == adjusted
.first
&&
2224 res
.returned
.front().get
<1>() == adjusted
.second
);
2225 map
<int, bufferlist
> to_decode
;
2227 for (map
<pg_shard_t
, bufferlist
>::iterator j
=
2228 res
.returned
.front().get
<2>().begin();
2229 j
!= res
.returned
.front().get
<2>().end();
2231 to_decode
[j
->first
.shard
].claim(j
->second
);
2233 int r
= ECUtil::decode(
2245 read
.get
<0>() - adjusted
.first
,
2247 bl
.length() - (read
.get
<0>() - adjusted
.first
)));
2249 read
.get
<0>(), trimmed
.length(), std::move(trimmed
));
2250 res
.returned
.pop_front();
2253 status
->complete_object(hoid
, res
.r
, std::move(result
));
2258 void ECBackend::objects_read_and_reconstruct(
2259 const map
<hobject_t
,
2260 std::list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >
2263 GenContextURef
<map
<hobject_t
,pair
<int, extent_map
> > &&> &&func
)
2265 in_progress_client_reads
.emplace_back(
2266 reads
.size(), std::move(func
));
2267 if (!reads
.size()) {
2272 set
<int> want_to_read
;
2273 get_want_to_read_shards(&want_to_read
);
2275 map
<hobject_t
, read_request_t
> for_read_op
;
2276 for (auto &&to_read
: reads
) {
2277 set
<pg_shard_t
> shards
;
2278 int r
= get_min_avail_to_read_shards(
2286 CallClientContexts
*c
= new CallClientContexts(
2289 &(in_progress_client_reads
.back()),
2302 CEPH_MSG_PRIO_DEFAULT
,
2310 int ECBackend::send_all_remaining_reads(
2311 const hobject_t
&hoid
,
2314 set
<int> already_read
;
2315 const set
<pg_shard_t
>& ots
= rop
.obj_to_source
[hoid
];
2316 for (set
<pg_shard_t
>::iterator i
= ots
.begin(); i
!= ots
.end(); ++i
)
2317 already_read
.insert(i
->shard
);
2318 dout(10) << __func__
<< " have/error shards=" << already_read
<< dendl
;
2319 set
<pg_shard_t
> shards
;
2320 int r
= get_remaining_shards(hoid
, already_read
, &shards
, rop
.for_recovery
);
2326 dout(10) << __func__
<< " Read remaining shards " << shards
<< dendl
;
2328 // TODOSAM: this doesn't seem right
2329 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > offsets
=
2330 rop
.to_read
.find(hoid
)->second
.to_read
;
2331 GenContext
<pair
<RecoveryMessages
*, read_result_t
& > &> *c
=
2332 rop
.to_read
.find(hoid
)->second
.cb
;
2334 map
<hobject_t
, read_request_t
> for_read_op
;
2344 rop
.to_read
.swap(for_read_op
);
2349 int ECBackend::objects_get_attrs(
2350 const hobject_t
&hoid
,
2351 map
<string
, bufferlist
> *out
)
2353 int r
= store
->getattrs(
2355 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
2360 for (map
<string
, bufferlist
>::iterator i
= out
->begin();
2363 if (ECUtil::is_hinfo_key_string(i
->first
))
2371 void ECBackend::rollback_append(
2372 const hobject_t
&hoid
,
2374 ObjectStore::Transaction
*t
)
2376 assert(old_size
% sinfo
.get_stripe_width() == 0);
2379 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
2380 sinfo
.aligned_logical_offset_to_chunk_offset(
2384 void ECBackend::be_deep_scrub(
2385 const hobject_t
&poid
,
2387 ScrubMap::object
&o
,
2388 ThreadPool::TPHandle
&handle
) {
2389 bufferhash
h(-1); // we always used -1
2391 uint64_t stride
= cct
->_conf
->osd_deep_scrub_stride
;
2392 if (stride
% sinfo
.get_chunk_size())
2393 stride
+= sinfo
.get_chunk_size() - (stride
% sinfo
.get_chunk_size());
2396 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
| CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
2400 handle
.reset_tp_timeout();
2404 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
2410 if (bl
.length() % sinfo
.get_chunk_size()) {
2416 if ((unsigned)r
< stride
)
2421 dout(0) << "_scan_list " << poid
<< " got "
2422 << r
<< " on read, read_error" << dendl
;
2423 o
.read_error
= true;
2427 ECUtil::HashInfoRef hinfo
= get_hash_info(poid
, false, &o
.attrs
);
2429 dout(0) << "_scan_list " << poid
<< " could not retrieve hash info" << dendl
;
2430 o
.read_error
= true;
2431 o
.digest_present
= false;
2434 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2435 assert(hinfo
->has_chunk_hash());
2436 if (hinfo
->get_total_chunk_size() != pos
) {
2437 dout(0) << "_scan_list " << poid
<< " got incorrect size on read" << dendl
;
2438 o
.ec_size_mismatch
= true;
2442 if (hinfo
->get_chunk_hash(get_parent()->whoami_shard().shard
) != h
.digest()) {
2443 dout(0) << "_scan_list " << poid
<< " got incorrect hash on read" << dendl
;
2444 o
.ec_hash_mismatch
= true;
2448 /* We checked above that we match our own stored hash. We cannot
2449 * send a hash of the actual object, so instead we simply send
2450 * our locally stored hash of shard 0 on the assumption that if
2451 * we match our chunk hash and our recollection of the hash for
2452 * chunk 0 matches that of our peers, there is likely no corruption.
2454 o
.digest
= hinfo
->get_chunk_hash(0);
2455 o
.digest_present
= true;
2457 /* Hack! We must be using partial overwrites, and partial overwrites
2458 * don't support deep-scrub yet
2461 o
.digest_present
= true;
2465 o
.omap_digest
= seed
;
2466 o
.omap_digest_present
= true;