1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include "ECBackend.h"
19 #include "messages/MOSDPGPush.h"
20 #include "messages/MOSDPGPushReply.h"
21 #include "messages/MOSDECSubOpWrite.h"
22 #include "messages/MOSDECSubOpWriteReply.h"
23 #include "messages/MOSDECSubOpRead.h"
24 #include "messages/MOSDECSubOpReadReply.h"
25 #include "ECMsgTypes.h"
27 #include "PrimaryLogPG.h"
29 #define dout_context cct
30 #define dout_subsys ceph_subsys_osd
31 #define DOUT_PREFIX_ARGS this
33 #define dout_prefix _prefix(_dout, this)
34 static ostream
& _prefix(std::ostream
*_dout
, ECBackend
*pgb
) {
35 return *_dout
<< pgb
->get_parent()->gen_dbg_prefix();
38 struct ECRecoveryHandle
: public PGBackend::RecoveryHandle
{
39 list
<ECBackend::RecoveryOp
> ops
;
42 ostream
&operator<<(ostream
&lhs
, const ECBackend::pipeline_state_t
&rhs
) {
43 switch (rhs
.pipeline_state
) {
44 case ECBackend::pipeline_state_t::CACHE_VALID
:
45 return lhs
<< "CACHE_VALID";
46 case ECBackend::pipeline_state_t::CACHE_INVALID
:
47 return lhs
<< "CACHE_INVALID";
49 assert(0 == "invalid pipeline state");
51 return lhs
; // unreachable
54 static ostream
&operator<<(ostream
&lhs
, const map
<pg_shard_t
, bufferlist
> &rhs
)
57 for (map
<pg_shard_t
, bufferlist
>::const_iterator i
= rhs
.begin();
62 lhs
<< make_pair(i
->first
, i
->second
.length());
67 static ostream
&operator<<(ostream
&lhs
, const map
<int, bufferlist
> &rhs
)
70 for (map
<int, bufferlist
>::const_iterator i
= rhs
.begin();
75 lhs
<< make_pair(i
->first
, i
->second
.length());
80 static ostream
&operator<<(
82 const boost::tuple
<uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > &rhs
)
84 return lhs
<< "(" << rhs
.get
<0>() << ", "
85 << rhs
.get
<1>() << ", " << rhs
.get
<2>() << ")";
88 ostream
&operator<<(ostream
&lhs
, const ECBackend::read_request_t
&rhs
)
90 return lhs
<< "read_request_t(to_read=[" << rhs
.to_read
<< "]"
91 << ", need=" << rhs
.need
92 << ", want_attrs=" << rhs
.want_attrs
96 ostream
&operator<<(ostream
&lhs
, const ECBackend::read_result_t
&rhs
)
98 lhs
<< "read_result_t(r=" << rhs
.r
99 << ", errors=" << rhs
.errors
;
101 lhs
<< ", attrs=" << rhs
.attrs
.get();
105 return lhs
<< ", returned=" << rhs
.returned
<< ")";
108 ostream
&operator<<(ostream
&lhs
, const ECBackend::ReadOp
&rhs
)
110 lhs
<< "ReadOp(tid=" << rhs
.tid
;
111 if (rhs
.op
&& rhs
.op
->get_req()) {
113 rhs
.op
->get_req()->print(lhs
);
115 return lhs
<< ", to_read=" << rhs
.to_read
116 << ", complete=" << rhs
.complete
117 << ", priority=" << rhs
.priority
118 << ", obj_to_source=" << rhs
.obj_to_source
119 << ", source_to_obj=" << rhs
.source_to_obj
120 << ", in_progress=" << rhs
.in_progress
<< ")";
123 void ECBackend::ReadOp::dump(Formatter
*f
) const
125 f
->dump_unsigned("tid", tid
);
126 if (op
&& op
->get_req()) {
127 f
->dump_stream("op") << *(op
->get_req());
129 f
->dump_stream("to_read") << to_read
;
130 f
->dump_stream("complete") << complete
;
131 f
->dump_int("priority", priority
);
132 f
->dump_stream("obj_to_source") << obj_to_source
;
133 f
->dump_stream("source_to_obj") << source_to_obj
;
134 f
->dump_stream("in_progress") << in_progress
;
137 ostream
&operator<<(ostream
&lhs
, const ECBackend::Op
&rhs
)
139 lhs
<< "Op(" << rhs
.hoid
140 << " v=" << rhs
.version
141 << " tt=" << rhs
.trim_to
142 << " tid=" << rhs
.tid
143 << " reqid=" << rhs
.reqid
;
144 if (rhs
.client_op
&& rhs
.client_op
->get_req()) {
145 lhs
<< " client_op=";
146 rhs
.client_op
->get_req()->print(lhs
);
148 lhs
<< " roll_forward_to=" << rhs
.roll_forward_to
149 << " temp_added=" << rhs
.temp_added
150 << " temp_cleared=" << rhs
.temp_cleared
151 << " pending_read=" << rhs
.pending_read
152 << " remote_read=" << rhs
.remote_read
153 << " remote_read_result=" << rhs
.remote_read_result
154 << " pending_apply=" << rhs
.pending_apply
155 << " pending_commit=" << rhs
.pending_commit
156 << " plan.to_read=" << rhs
.plan
.to_read
157 << " plan.will_write=" << rhs
.plan
.will_write
162 ostream
&operator<<(ostream
&lhs
, const ECBackend::RecoveryOp
&rhs
)
164 return lhs
<< "RecoveryOp("
165 << "hoid=" << rhs
.hoid
167 << " missing_on=" << rhs
.missing_on
168 << " missing_on_shards=" << rhs
.missing_on_shards
169 << " recovery_info=" << rhs
.recovery_info
170 << " recovery_progress=" << rhs
.recovery_progress
171 << " obc refcount=" << rhs
.obc
.use_count()
172 << " state=" << ECBackend::RecoveryOp::tostr(rhs
.state
)
173 << " waiting_on_pushes=" << rhs
.waiting_on_pushes
174 << " extent_requested=" << rhs
.extent_requested
178 void ECBackend::RecoveryOp::dump(Formatter
*f
) const
180 f
->dump_stream("hoid") << hoid
;
181 f
->dump_stream("v") << v
;
182 f
->dump_stream("missing_on") << missing_on
;
183 f
->dump_stream("missing_on_shards") << missing_on_shards
;
184 f
->dump_stream("recovery_info") << recovery_info
;
185 f
->dump_stream("recovery_progress") << recovery_progress
;
186 f
->dump_stream("state") << tostr(state
);
187 f
->dump_stream("waiting_on_pushes") << waiting_on_pushes
;
188 f
->dump_stream("extent_requested") << extent_requested
;
191 ECBackend::ECBackend(
192 PGBackend::Listener
*pg
,
194 ObjectStore::CollectionHandle
&ch
,
197 ErasureCodeInterfaceRef ec_impl
,
198 uint64_t stripe_width
)
199 : PGBackend(cct
, pg
, store
, coll
, ch
),
201 sinfo(ec_impl
->get_data_chunk_count(), stripe_width
) {
202 assert((ec_impl
->get_data_chunk_count() *
203 ec_impl
->get_chunk_size(stripe_width
)) == stripe_width
);
206 PGBackend::RecoveryHandle
*ECBackend::open_recovery_op()
208 return new ECRecoveryHandle
;
211 void ECBackend::_failed_push(const hobject_t
&hoid
,
212 pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
)
214 ECBackend::read_result_t
&res
= in
.second
;
215 dout(10) << __func__
<< ": Read error " << hoid
<< " r="
216 << res
.r
<< " errors=" << res
.errors
<< dendl
;
217 dout(10) << __func__
<< ": canceling recovery op for obj " << hoid
219 assert(recovery_ops
.count(hoid
));
220 eversion_t v
= recovery_ops
[hoid
].v
;
221 recovery_ops
.erase(hoid
);
224 for (auto&& i
: res
.errors
) {
225 fl
.push_back(i
.first
);
227 get_parent()->failed_push(fl
, hoid
);
228 get_parent()->backfill_add_missing(hoid
, v
);
229 get_parent()->finish_degraded_object(hoid
);
232 struct OnRecoveryReadComplete
:
233 public GenContext
<pair
<RecoveryMessages
*, ECBackend::read_result_t
& > &> {
237 OnRecoveryReadComplete(ECBackend
*pg
, const hobject_t
&hoid
)
238 : pg(pg
), hoid(hoid
) {}
239 void finish(pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
) override
{
240 ECBackend::read_result_t
&res
= in
.second
;
241 if (!(res
.r
== 0 && res
.errors
.empty())) {
242 pg
->_failed_push(hoid
, in
);
245 assert(res
.returned
.size() == 1);
246 pg
->handle_recovery_read_complete(
254 struct RecoveryMessages
{
256 ECBackend::read_request_t
> reads
;
257 map
<hobject_t
, set
<int>> want_to_read
;
260 const hobject_t
&hoid
, uint64_t off
, uint64_t len
,
261 set
<int> &&_want_to_read
,
262 const set
<pg_shard_t
> &need
,
264 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > to_read
;
265 to_read
.push_back(boost::make_tuple(off
, len
, 0));
266 assert(!reads
.count(hoid
));
267 want_to_read
.insert(make_pair(hoid
, std::move(_want_to_read
)));
271 ECBackend::read_request_t(
275 new OnRecoveryReadComplete(
280 map
<pg_shard_t
, vector
<PushOp
> > pushes
;
281 map
<pg_shard_t
, vector
<PushReplyOp
> > push_replies
;
282 ObjectStore::Transaction t
;
283 RecoveryMessages() {}
284 ~RecoveryMessages(){}
287 void ECBackend::handle_recovery_push(
292 if (get_parent()->check_failsafe_full(ss
)) {
293 dout(10) << __func__
<< " Out of space (failsafe) processing push request: " << ss
.str() << dendl
;
297 bool oneshot
= op
.before_progress
.first
&& op
.after_progress
.data_complete
;
300 tobj
= ghobject_t(op
.soid
, ghobject_t::NO_GEN
,
301 get_parent()->whoami_shard().shard
);
303 tobj
= ghobject_t(get_parent()->get_temp_recovery_object(op
.soid
,
306 get_parent()->whoami_shard().shard
);
307 if (op
.before_progress
.first
) {
308 dout(10) << __func__
<< ": Adding oid "
309 << tobj
.hobj
<< " in the temp collection" << dendl
;
310 add_temp_obj(tobj
.hobj
);
314 if (op
.before_progress
.first
) {
315 m
->t
.remove(coll
, tobj
);
316 m
->t
.touch(coll
, tobj
);
319 if (!op
.data_included
.empty()) {
320 uint64_t start
= op
.data_included
.range_start();
321 uint64_t end
= op
.data_included
.range_end();
322 assert(op
.data
.length() == (end
- start
));
331 assert(op
.data
.length() == 0);
334 if (op
.before_progress
.first
) {
335 assert(op
.attrset
.count(string("_")));
342 if (op
.after_progress
.data_complete
&& !oneshot
) {
343 dout(10) << __func__
<< ": Removing oid "
344 << tobj
.hobj
<< " from the temp collection" << dendl
;
345 clear_temp_obj(tobj
.hobj
);
346 m
->t
.remove(coll
, ghobject_t(
347 op
.soid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
348 m
->t
.collection_move_rename(
351 op
.soid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
353 if (op
.after_progress
.data_complete
) {
354 if ((get_parent()->pgb_is_primary())) {
355 assert(recovery_ops
.count(op
.soid
));
356 assert(recovery_ops
[op
.soid
].obc
);
357 get_parent()->on_local_recover(
360 recovery_ops
[op
.soid
].obc
,
364 get_parent()->on_local_recover(
372 m
->push_replies
[get_parent()->primary_shard()].push_back(PushReplyOp());
373 m
->push_replies
[get_parent()->primary_shard()].back().soid
= op
.soid
;
376 void ECBackend::handle_recovery_push_reply(
377 const PushReplyOp
&op
,
381 if (!recovery_ops
.count(op
.soid
))
383 RecoveryOp
&rop
= recovery_ops
[op
.soid
];
384 assert(rop
.waiting_on_pushes
.count(from
));
385 rop
.waiting_on_pushes
.erase(from
);
386 continue_recovery_op(rop
, m
);
389 void ECBackend::handle_recovery_read_complete(
390 const hobject_t
&hoid
,
391 boost::tuple
<uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > &to_read
,
392 boost::optional
<map
<string
, bufferlist
> > attrs
,
395 dout(10) << __func__
<< ": returned " << hoid
<< " "
396 << "(" << to_read
.get
<0>()
397 << ", " << to_read
.get
<1>()
398 << ", " << to_read
.get
<2>()
401 assert(recovery_ops
.count(hoid
));
402 RecoveryOp
&op
= recovery_ops
[hoid
];
403 assert(op
.returned_data
.empty());
404 map
<int, bufferlist
*> target
;
405 for (set
<shard_id_t
>::iterator i
= op
.missing_on_shards
.begin();
406 i
!= op
.missing_on_shards
.end();
408 target
[*i
] = &(op
.returned_data
[*i
]);
410 map
<int, bufferlist
> from
;
411 for(map
<pg_shard_t
, bufferlist
>::iterator i
= to_read
.get
<2>().begin();
412 i
!= to_read
.get
<2>().end();
414 from
[i
->first
.shard
].claim(i
->second
);
416 dout(10) << __func__
<< ": " << from
<< dendl
;
417 int r
= ECUtil::decode(sinfo
, ec_impl
, from
, target
);
420 op
.xattrs
.swap(*attrs
);
423 // attrs only reference the origin bufferlist (decode from
424 // ECSubReadReply message) whose size is much greater than attrs
425 // in recovery. If obc cache it (get_obc maybe cache the attr),
426 // this causes the whole origin bufferlist would not be free
427 // until obc is evicted from obc cache. So rebuild the
428 // bufferlist before cache it.
429 for (map
<string
, bufferlist
>::iterator it
= op
.xattrs
.begin();
430 it
!= op
.xattrs
.end();
432 it
->second
.rebuild();
434 // Need to remove ECUtil::get_hinfo_key() since it should not leak out
435 // of the backend (see bug #12983)
436 map
<string
, bufferlist
> sanitized_attrs(op
.xattrs
);
437 sanitized_attrs
.erase(ECUtil::get_hinfo_key());
438 op
.obc
= get_parent()->get_obc(hoid
, sanitized_attrs
);
440 op
.recovery_info
.size
= op
.obc
->obs
.oi
.size
;
441 op
.recovery_info
.oi
= op
.obc
->obs
.oi
;
444 ECUtil::HashInfo
hinfo(ec_impl
->get_chunk_count());
445 if (op
.obc
->obs
.oi
.size
> 0) {
446 assert(op
.xattrs
.count(ECUtil::get_hinfo_key()));
447 bufferlist::iterator bp
= op
.xattrs
[ECUtil::get_hinfo_key()].begin();
450 op
.hinfo
= unstable_hashinfo_registry
.lookup_or_create(hoid
, hinfo
);
452 assert(op
.xattrs
.size());
454 continue_recovery_op(op
, m
);
457 struct SendPushReplies
: public Context
{
458 PGBackend::Listener
*l
;
460 map
<int, MOSDPGPushReply
*> replies
;
462 PGBackend::Listener
*l
,
464 map
<int, MOSDPGPushReply
*> &in
) : l(l
), epoch(epoch
) {
467 void finish(int) override
{
468 for (map
<int, MOSDPGPushReply
*>::iterator i
= replies
.begin();
471 l
->send_message_osd_cluster(i
->first
, i
->second
, epoch
);
475 ~SendPushReplies() override
{
476 for (map
<int, MOSDPGPushReply
*>::iterator i
= replies
.begin();
485 void ECBackend::dispatch_recovery_messages(RecoveryMessages
&m
, int priority
)
487 for (map
<pg_shard_t
, vector
<PushOp
> >::iterator i
= m
.pushes
.begin();
489 m
.pushes
.erase(i
++)) {
490 MOSDPGPush
*msg
= new MOSDPGPush();
491 msg
->set_priority(priority
);
492 msg
->map_epoch
= get_parent()->get_epoch();
493 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
494 msg
->from
= get_parent()->whoami_shard();
495 msg
->pgid
= spg_t(get_parent()->get_info().pgid
.pgid
, i
->first
.shard
);
496 msg
->pushes
.swap(i
->second
);
497 msg
->compute_cost(cct
);
498 get_parent()->send_message(
502 map
<int, MOSDPGPushReply
*> replies
;
503 for (map
<pg_shard_t
, vector
<PushReplyOp
> >::iterator i
=
504 m
.push_replies
.begin();
505 i
!= m
.push_replies
.end();
506 m
.push_replies
.erase(i
++)) {
507 MOSDPGPushReply
*msg
= new MOSDPGPushReply();
508 msg
->set_priority(priority
);
509 msg
->map_epoch
= get_parent()->get_epoch();
510 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
511 msg
->from
= get_parent()->whoami_shard();
512 msg
->pgid
= spg_t(get_parent()->get_info().pgid
.pgid
, i
->first
.shard
);
513 msg
->replies
.swap(i
->second
);
514 msg
->compute_cost(cct
);
515 replies
.insert(make_pair(i
->first
.osd
, msg
));
518 if (!replies
.empty()) {
519 (m
.t
).register_on_complete(
520 get_parent()->bless_context(
523 get_parent()->get_epoch(),
525 get_parent()->queue_transaction(std::move(m
.t
));
538 void ECBackend::continue_recovery_op(
542 dout(10) << __func__
<< ": continuing " << op
<< dendl
;
545 case RecoveryOp::IDLE
: {
547 op
.state
= RecoveryOp::READING
;
548 assert(!op
.recovery_progress
.data_complete
);
549 set
<int> want(op
.missing_on_shards
.begin(), op
.missing_on_shards
.end());
550 uint64_t from
= op
.recovery_progress
.data_recovered_to
;
551 uint64_t amount
= get_recovery_chunk_size();
553 if (op
.recovery_progress
.first
&& op
.obc
) {
554 /* We've got the attrs and the hinfo, might as well use them */
555 op
.hinfo
= get_hash_info(op
.hoid
);
557 op
.xattrs
= op
.obc
->attr_cache
;
558 ::encode(*(op
.hinfo
), op
.xattrs
[ECUtil::get_hinfo_key()]);
561 set
<pg_shard_t
> to_read
;
562 int r
= get_min_avail_to_read_shards(
563 op
.hoid
, want
, true, false, &to_read
);
565 // we must have lost a recovery source
566 assert(!op
.recovery_progress
.first
);
567 dout(10) << __func__
<< ": canceling recovery op for obj " << op
.hoid
569 get_parent()->cancel_pull(op
.hoid
);
570 recovery_ops
.erase(op
.hoid
);
576 op
.recovery_progress
.data_recovered_to
,
580 op
.recovery_progress
.first
&& !op
.obc
);
581 op
.extent_requested
= make_pair(
584 dout(10) << __func__
<< ": IDLE return " << op
<< dendl
;
587 case RecoveryOp::READING
: {
588 // read completed, start write
589 assert(op
.xattrs
.size());
590 assert(op
.returned_data
.size());
591 op
.state
= RecoveryOp::WRITING
;
592 ObjectRecoveryProgress after_progress
= op
.recovery_progress
;
593 after_progress
.data_recovered_to
+= op
.extent_requested
.second
;
594 after_progress
.first
= false;
595 if (after_progress
.data_recovered_to
>= op
.obc
->obs
.oi
.size
) {
596 after_progress
.data_recovered_to
=
597 sinfo
.logical_to_next_stripe_offset(
598 op
.obc
->obs
.oi
.size
);
599 after_progress
.data_complete
= true;
601 for (set
<pg_shard_t
>::iterator mi
= op
.missing_on
.begin();
602 mi
!= op
.missing_on
.end();
604 assert(op
.returned_data
.count(mi
->shard
));
605 m
->pushes
[*mi
].push_back(PushOp());
606 PushOp
&pop
= m
->pushes
[*mi
].back();
609 pop
.data
= op
.returned_data
[mi
->shard
];
610 dout(10) << __func__
<< ": before_progress=" << op
.recovery_progress
611 << ", after_progress=" << after_progress
612 << ", pop.data.length()=" << pop
.data
.length()
613 << ", size=" << op
.obc
->obs
.oi
.size
<< dendl
;
616 sinfo
.aligned_logical_offset_to_chunk_offset(
617 after_progress
.data_recovered_to
-
618 op
.recovery_progress
.data_recovered_to
)
620 if (pop
.data
.length())
621 pop
.data_included
.insert(
622 sinfo
.aligned_logical_offset_to_chunk_offset(
623 op
.recovery_progress
.data_recovered_to
),
626 if (op
.recovery_progress
.first
) {
627 pop
.attrset
= op
.xattrs
;
629 pop
.recovery_info
= op
.recovery_info
;
630 pop
.before_progress
= op
.recovery_progress
;
631 pop
.after_progress
= after_progress
;
632 if (*mi
!= get_parent()->primary_shard())
633 get_parent()->begin_peer_recover(
637 op
.returned_data
.clear();
638 op
.waiting_on_pushes
= op
.missing_on
;
639 op
.recovery_progress
= after_progress
;
640 dout(10) << __func__
<< ": READING return " << op
<< dendl
;
643 case RecoveryOp::WRITING
: {
644 if (op
.waiting_on_pushes
.empty()) {
645 if (op
.recovery_progress
.data_complete
) {
646 op
.state
= RecoveryOp::COMPLETE
;
647 for (set
<pg_shard_t
>::iterator i
= op
.missing_on
.begin();
648 i
!= op
.missing_on
.end();
650 if (*i
!= get_parent()->primary_shard()) {
651 dout(10) << __func__
<< ": on_peer_recover on " << *i
652 << ", obj " << op
.hoid
<< dendl
;
653 get_parent()->on_peer_recover(
659 object_stat_sum_t stat
;
660 stat
.num_bytes_recovered
= op
.recovery_info
.size
;
661 stat
.num_keys_recovered
= 0; // ??? op ... omap_entries.size(); ?
662 stat
.num_objects_recovered
= 1;
663 get_parent()->on_global_recover(op
.hoid
, stat
, false);
664 dout(10) << __func__
<< ": WRITING return " << op
<< dendl
;
665 recovery_ops
.erase(op
.hoid
);
668 op
.state
= RecoveryOp::IDLE
;
669 dout(10) << __func__
<< ": WRITING continue " << op
<< dendl
;
675 // should never be called once complete
676 case RecoveryOp::COMPLETE
:
684 void ECBackend::run_recovery_op(
688 ECRecoveryHandle
*h
= static_cast<ECRecoveryHandle
*>(_h
);
690 for (list
<RecoveryOp
>::iterator i
= h
->ops
.begin();
693 dout(10) << __func__
<< ": starting " << *i
<< dendl
;
694 assert(!recovery_ops
.count(i
->hoid
));
695 RecoveryOp
&op
= recovery_ops
.insert(make_pair(i
->hoid
, *i
)).first
->second
;
696 continue_recovery_op(op
, &m
);
699 dispatch_recovery_messages(m
, priority
);
700 send_recovery_deletes(priority
, h
->deletes
);
704 int ECBackend::recover_object(
705 const hobject_t
&hoid
,
707 ObjectContextRef head
,
708 ObjectContextRef obc
,
711 ECRecoveryHandle
*h
= static_cast<ECRecoveryHandle
*>(_h
);
712 h
->ops
.push_back(RecoveryOp());
714 h
->ops
.back().hoid
= hoid
;
715 h
->ops
.back().obc
= obc
;
716 h
->ops
.back().recovery_info
.soid
= hoid
;
717 h
->ops
.back().recovery_info
.version
= v
;
719 h
->ops
.back().recovery_info
.size
= obc
->obs
.oi
.size
;
720 h
->ops
.back().recovery_info
.oi
= obc
->obs
.oi
;
722 if (hoid
.is_snap()) {
725 h
->ops
.back().recovery_info
.ss
= obc
->ssc
->snapset
;
728 h
->ops
.back().recovery_info
.ss
= head
->ssc
->snapset
;
730 assert(0 == "neither obc nor head set for a snap object");
733 h
->ops
.back().recovery_progress
.omap_complete
= true;
734 for (set
<pg_shard_t
>::const_iterator i
=
735 get_parent()->get_actingbackfill_shards().begin();
736 i
!= get_parent()->get_actingbackfill_shards().end();
738 dout(10) << "checking " << *i
<< dendl
;
739 if (get_parent()->get_shard_missing(*i
).is_missing(hoid
)) {
740 h
->ops
.back().missing_on
.insert(*i
);
741 h
->ops
.back().missing_on_shards
.insert(i
->shard
);
744 dout(10) << __func__
<< ": built op " << h
->ops
.back() << dendl
;
748 bool ECBackend::can_handle_while_inactive(
754 bool ECBackend::_handle_message(
757 dout(10) << __func__
<< ": " << *_op
->get_req() << dendl
;
758 int priority
= _op
->get_req()->get_priority();
759 switch (_op
->get_req()->get_type()) {
760 case MSG_OSD_EC_WRITE
: {
761 // NOTE: this is non-const because handle_sub_write modifies the embedded
762 // ObjectStore::Transaction in place (and then std::move's it). It does
763 // not conflict with ECSubWrite's operator<<.
764 MOSDECSubOpWrite
*op
= static_cast<MOSDECSubOpWrite
*>(
765 _op
->get_nonconst_req());
766 parent
->maybe_preempt_replica_scrub(op
->op
.soid
);
767 handle_sub_write(op
->op
.from
, _op
, op
->op
, _op
->pg_trace
);
770 case MSG_OSD_EC_WRITE_REPLY
: {
771 const MOSDECSubOpWriteReply
*op
= static_cast<const MOSDECSubOpWriteReply
*>(
773 handle_sub_write_reply(op
->op
.from
, op
->op
, _op
->pg_trace
);
776 case MSG_OSD_EC_READ
: {
777 const MOSDECSubOpRead
*op
= static_cast<const MOSDECSubOpRead
*>(_op
->get_req());
778 MOSDECSubOpReadReply
*reply
= new MOSDECSubOpReadReply
;
779 reply
->pgid
= get_parent()->primary_spg_t();
780 reply
->map_epoch
= get_parent()->get_epoch();
781 reply
->min_epoch
= get_parent()->get_interval_start_epoch();
782 handle_sub_read(op
->op
.from
, op
->op
, &(reply
->op
), _op
->pg_trace
);
783 reply
->trace
= _op
->pg_trace
;
784 get_parent()->send_message_osd_cluster(
785 op
->op
.from
.osd
, reply
, get_parent()->get_epoch());
788 case MSG_OSD_EC_READ_REPLY
: {
789 // NOTE: this is non-const because handle_sub_read_reply steals resulting
790 // buffers. It does not conflict with ECSubReadReply operator<<.
791 MOSDECSubOpReadReply
*op
= static_cast<MOSDECSubOpReadReply
*>(
792 _op
->get_nonconst_req());
794 handle_sub_read_reply(op
->op
.from
, op
->op
, &rm
, _op
->pg_trace
);
795 dispatch_recovery_messages(rm
, priority
);
798 case MSG_OSD_PG_PUSH
: {
799 const MOSDPGPush
*op
= static_cast<const MOSDPGPush
*>(_op
->get_req());
801 for (vector
<PushOp
>::const_iterator i
= op
->pushes
.begin();
802 i
!= op
->pushes
.end();
804 handle_recovery_push(*i
, &rm
);
806 dispatch_recovery_messages(rm
, priority
);
809 case MSG_OSD_PG_PUSH_REPLY
: {
810 const MOSDPGPushReply
*op
= static_cast<const MOSDPGPushReply
*>(
813 for (vector
<PushReplyOp
>::const_iterator i
= op
->replies
.begin();
814 i
!= op
->replies
.end();
816 handle_recovery_push_reply(*i
, op
->from
, &rm
);
818 dispatch_recovery_messages(rm
, priority
);
827 struct SubWriteCommitted
: public Context
{
832 eversion_t last_complete
;
833 const ZTracer::Trace trace
;
839 eversion_t last_complete
,
840 const ZTracer::Trace
&trace
)
841 : pg(pg
), msg(msg
), tid(tid
),
842 version(version
), last_complete(last_complete
), trace(trace
) {}
843 void finish(int) override
{
845 msg
->mark_event("sub_op_committed");
846 pg
->sub_write_committed(tid
, version
, last_complete
, trace
);
849 void ECBackend::sub_write_committed(
850 ceph_tid_t tid
, eversion_t version
, eversion_t last_complete
,
851 const ZTracer::Trace
&trace
) {
852 if (get_parent()->pgb_is_primary()) {
853 ECSubWriteReply reply
;
855 reply
.last_complete
= last_complete
;
856 reply
.committed
= true;
857 reply
.from
= get_parent()->whoami_shard();
858 handle_sub_write_reply(
859 get_parent()->whoami_shard(),
862 get_parent()->update_last_complete_ondisk(last_complete
);
863 MOSDECSubOpWriteReply
*r
= new MOSDECSubOpWriteReply
;
864 r
->pgid
= get_parent()->primary_spg_t();
865 r
->map_epoch
= get_parent()->get_epoch();
866 r
->min_epoch
= get_parent()->get_interval_start_epoch();
868 r
->op
.last_complete
= last_complete
;
869 r
->op
.committed
= true;
870 r
->op
.from
= get_parent()->whoami_shard();
871 r
->set_priority(CEPH_MSG_PRIO_HIGH
);
873 r
->trace
.event("sending sub op commit");
874 get_parent()->send_message_osd_cluster(
875 get_parent()->primary_shard().osd
, r
, get_parent()->get_epoch());
879 struct SubWriteApplied
: public Context
{
884 const ZTracer::Trace trace
;
890 const ZTracer::Trace
&trace
)
891 : pg(pg
), msg(msg
), tid(tid
), version(version
), trace(trace
) {}
892 void finish(int) override
{
894 msg
->mark_event("sub_op_applied");
895 pg
->sub_write_applied(tid
, version
, trace
);
898 void ECBackend::sub_write_applied(
899 ceph_tid_t tid
, eversion_t version
,
900 const ZTracer::Trace
&trace
) {
901 parent
->op_applied(version
);
902 if (get_parent()->pgb_is_primary()) {
903 ECSubWriteReply reply
;
904 reply
.from
= get_parent()->whoami_shard();
906 reply
.applied
= true;
907 handle_sub_write_reply(
908 get_parent()->whoami_shard(),
911 MOSDECSubOpWriteReply
*r
= new MOSDECSubOpWriteReply
;
912 r
->pgid
= get_parent()->primary_spg_t();
913 r
->map_epoch
= get_parent()->get_epoch();
914 r
->min_epoch
= get_parent()->get_interval_start_epoch();
915 r
->op
.from
= get_parent()->whoami_shard();
917 r
->op
.applied
= true;
918 r
->set_priority(CEPH_MSG_PRIO_HIGH
);
920 r
->trace
.event("sending sub op apply");
921 get_parent()->send_message_osd_cluster(
922 get_parent()->primary_shard().osd
, r
, get_parent()->get_epoch());
926 void ECBackend::handle_sub_write(
930 const ZTracer::Trace
&trace
,
931 Context
*on_local_applied_sync
)
935 trace
.event("handle_sub_write");
936 assert(!get_parent()->get_log().get_missing().is_missing(op
.soid
));
937 if (!get_parent()->pgb_is_primary())
938 get_parent()->update_stats(op
.stats
);
939 ObjectStore::Transaction localt
;
940 if (!op
.temp_added
.empty()) {
941 add_temp_objs(op
.temp_added
);
944 for (set
<hobject_t
>::iterator i
= op
.temp_removed
.begin();
945 i
!= op
.temp_removed
.end();
947 dout(10) << __func__
<< ": removing object " << *i
948 << " since we won't get the transaction" << dendl
;
954 get_parent()->whoami_shard().shard
));
957 clear_temp_objs(op
.temp_removed
);
958 get_parent()->log_operation(
960 op
.updated_hit_set_history
,
966 PrimaryLogPG
*_rPG
= dynamic_cast<PrimaryLogPG
*>(get_parent());
967 if (_rPG
&& !_rPG
->is_undersized() &&
968 (unsigned)get_parent()->whoami_shard().shard
>= ec_impl
->get_data_chunk_count())
969 op
.t
.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
971 if (on_local_applied_sync
) {
972 dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync
<< dendl
;
973 localt
.register_on_applied_sync(on_local_applied_sync
);
975 localt
.register_on_commit(
976 get_parent()->bless_context(
977 new SubWriteCommitted(
980 get_parent()->get_info().last_complete
, trace
)));
981 localt
.register_on_applied(
982 get_parent()->bless_context(
983 new SubWriteApplied(this, msg
, op
.tid
, op
.at_version
, trace
)));
984 vector
<ObjectStore::Transaction
> tls
;
986 tls
.push_back(std::move(op
.t
));
987 tls
.push_back(std::move(localt
));
988 get_parent()->queue_transactions(tls
, msg
);
991 void ECBackend::handle_sub_read(
994 ECSubReadReply
*reply
,
995 const ZTracer::Trace
&trace
)
997 trace
.event("handle sub read");
998 shard_id_t shard
= get_parent()->whoami_shard().shard
;
999 for(auto i
= op
.to_read
.begin();
1000 i
!= op
.to_read
.end();
1003 ECUtil::HashInfoRef hinfo
;
1004 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1005 hinfo
= get_hash_info(i
->first
);
1008 get_parent()->clog_error() << "Corruption detected: object " << i
->first
1009 << " is missing hash_info";
1010 dout(5) << __func__
<< ": No hinfo for " << i
->first
<< dendl
;
1014 for (auto j
= i
->second
.begin(); j
!= i
->second
.end(); ++j
) {
1018 ghobject_t(i
->first
, ghobject_t::NO_GEN
, shard
),
1023 get_parent()->clog_error() << "Error " << r
1024 << " reading object "
1026 dout(5) << __func__
<< ": Error " << r
1027 << " reading " << i
->first
<< dendl
;
1030 dout(20) << __func__
<< " read request=" << j
->get
<1>() << " r=" << r
<< " len=" << bl
.length() << dendl
;
1031 reply
->buffers_read
[i
->first
].push_back(
1038 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1039 // This shows that we still need deep scrub because large enough files
1040 // are read in sections, so the digest check here won't be done here.
1041 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1042 // the state of our chunk in case other chunks could substitute.
1043 assert(hinfo
->has_chunk_hash());
1044 if ((bl
.length() == hinfo
->get_total_chunk_size()) &&
1045 (j
->get
<0>() == 0)) {
1046 dout(20) << __func__
<< ": Checking hash of " << i
->first
<< dendl
;
1049 if (h
.digest() != hinfo
->get_chunk_hash(shard
)) {
1050 get_parent()->clog_error() << "Bad hash for " << i
->first
<< " digest 0x"
1051 << hex
<< h
.digest() << " expected 0x" << hinfo
->get_chunk_hash(shard
) << dec
;
1052 dout(5) << __func__
<< ": Bad hash for " << i
->first
<< " digest 0x"
1053 << hex
<< h
.digest() << " expected 0x" << hinfo
->get_chunk_hash(shard
) << dec
<< dendl
;
1062 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1063 // the state of our chunk in case other chunks could substitute.
1064 reply
->buffers_read
.erase(i
->first
);
1065 reply
->errors
[i
->first
] = r
;
1067 for (set
<hobject_t
>::iterator i
= op
.attrs_to_read
.begin();
1068 i
!= op
.attrs_to_read
.end();
1070 dout(10) << __func__
<< ": fulfilling attr request on "
1072 if (reply
->errors
.count(*i
))
1074 int r
= store
->getattrs(
1077 *i
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
1078 reply
->attrs_read
[*i
]);
1080 reply
->buffers_read
.erase(*i
);
1081 reply
->errors
[*i
] = r
;
1084 reply
->from
= get_parent()->whoami_shard();
1085 reply
->tid
= op
.tid
;
1088 void ECBackend::handle_sub_write_reply(
1090 const ECSubWriteReply
&op
,
1091 const ZTracer::Trace
&trace
)
1093 map
<ceph_tid_t
, Op
>::iterator i
= tid_to_op_map
.find(op
.tid
);
1094 assert(i
!= tid_to_op_map
.end());
1096 trace
.event("sub write committed");
1097 assert(i
->second
.pending_commit
.count(from
));
1098 i
->second
.pending_commit
.erase(from
);
1099 if (from
!= get_parent()->whoami_shard()) {
1100 get_parent()->update_peer_last_complete_ondisk(from
, op
.last_complete
);
1104 trace
.event("sub write applied");
1105 assert(i
->second
.pending_apply
.count(from
));
1106 i
->second
.pending_apply
.erase(from
);
1109 if (i
->second
.pending_apply
.empty() && i
->second
.on_all_applied
) {
1110 dout(10) << __func__
<< " Calling on_all_applied on " << i
->second
<< dendl
;
1111 i
->second
.on_all_applied
->complete(0);
1112 i
->second
.on_all_applied
= 0;
1113 i
->second
.trace
.event("ec write all applied");
1115 if (i
->second
.pending_commit
.empty() && i
->second
.on_all_commit
) {
1116 dout(10) << __func__
<< " Calling on_all_commit on " << i
->second
<< dendl
;
1117 i
->second
.on_all_commit
->complete(0);
1118 i
->second
.on_all_commit
= 0;
1119 i
->second
.trace
.event("ec write all committed");
1124 void ECBackend::handle_sub_read_reply(
1127 RecoveryMessages
*m
,
1128 const ZTracer::Trace
&trace
)
1130 trace
.event("ec sub read reply");
1131 dout(10) << __func__
<< ": reply " << op
<< dendl
;
1132 map
<ceph_tid_t
, ReadOp
>::iterator iter
= tid_to_read_map
.find(op
.tid
);
1133 if (iter
== tid_to_read_map
.end()) {
1135 dout(20) << __func__
<< ": dropped " << op
<< dendl
;
1138 ReadOp
&rop
= iter
->second
;
1139 for (auto i
= op
.buffers_read
.begin();
1140 i
!= op
.buffers_read
.end();
1142 assert(!op
.errors
.count(i
->first
)); // If attribute error we better not have sent a buffer
1143 if (!rop
.to_read
.count(i
->first
)) {
1144 // We canceled this read! @see filter_read_op
1145 dout(20) << __func__
<< " to_read skipping" << dendl
;
1148 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter
=
1149 rop
.to_read
.find(i
->first
)->second
.to_read
.begin();
1152 uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > >::iterator riter
=
1153 rop
.complete
[i
->first
].returned
.begin();
1154 for (list
<pair
<uint64_t, bufferlist
> >::iterator j
= i
->second
.begin();
1155 j
!= i
->second
.end();
1156 ++j
, ++req_iter
, ++riter
) {
1157 assert(req_iter
!= rop
.to_read
.find(i
->first
)->second
.to_read
.end());
1158 assert(riter
!= rop
.complete
[i
->first
].returned
.end());
1159 pair
<uint64_t, uint64_t> adjusted
=
1160 sinfo
.aligned_offset_len_to_chunk(
1161 make_pair(req_iter
->get
<0>(), req_iter
->get
<1>()));
1162 assert(adjusted
.first
== j
->first
);
1163 riter
->get
<2>()[from
].claim(j
->second
);
1166 for (auto i
= op
.attrs_read
.begin();
1167 i
!= op
.attrs_read
.end();
1169 assert(!op
.errors
.count(i
->first
)); // if read error better not have sent an attribute
1170 if (!rop
.to_read
.count(i
->first
)) {
1171 // We canceled this read! @see filter_read_op
1172 dout(20) << __func__
<< " to_read skipping" << dendl
;
1175 rop
.complete
[i
->first
].attrs
= map
<string
, bufferlist
>();
1176 (*(rop
.complete
[i
->first
].attrs
)).swap(i
->second
);
1178 for (auto i
= op
.errors
.begin();
1179 i
!= op
.errors
.end();
1181 rop
.complete
[i
->first
].errors
.insert(
1185 dout(20) << __func__
<< " shard=" << from
<< " error=" << i
->second
<< dendl
;
1188 map
<pg_shard_t
, set
<ceph_tid_t
> >::iterator siter
=
1189 shard_to_read_map
.find(from
);
1190 assert(siter
!= shard_to_read_map
.end());
1191 assert(siter
->second
.count(op
.tid
));
1192 siter
->second
.erase(op
.tid
);
1194 assert(rop
.in_progress
.count(from
));
1195 rop
.in_progress
.erase(from
);
1196 unsigned is_complete
= 0;
1197 // For redundant reads check for completion as each shard comes in,
1198 // or in a non-recovery read check for completion once all the shards read.
1199 if (rop
.do_redundant_reads
|| rop
.in_progress
.empty()) {
1200 for (map
<hobject_t
, read_result_t
>::const_iterator iter
=
1201 rop
.complete
.begin();
1202 iter
!= rop
.complete
.end();
1205 for (map
<pg_shard_t
, bufferlist
>::const_iterator j
=
1206 iter
->second
.returned
.front().get
<2>().begin();
1207 j
!= iter
->second
.returned
.front().get
<2>().end();
1209 have
.insert(j
->first
.shard
);
1210 dout(20) << __func__
<< " have shard=" << j
->first
.shard
<< dendl
;
1212 set
<int> dummy_minimum
;
1214 if ((err
= ec_impl
->minimum_to_decode(rop
.want_to_read
[iter
->first
], have
, &dummy_minimum
)) < 0) {
1215 dout(20) << __func__
<< " minimum_to_decode failed" << dendl
;
1216 if (rop
.in_progress
.empty()) {
1217 // If we don't have enough copies and we haven't sent reads for all shards
1218 // we can send the rest of the reads, if any.
1219 if (!rop
.do_redundant_reads
) {
1220 int r
= send_all_remaining_reads(iter
->first
, rop
);
1222 // We added to in_progress and not incrementing is_complete
1225 // Couldn't read any additional shards so handle as completed with errors
1227 // We don't want to confuse clients / RBD with objectstore error
1228 // values in particular ENOENT. We may have different error returns
1229 // from different shards, so we'll return minimum_to_decode() error
1230 // (usually EIO) to reader. It is likely an error here is due to a
1232 rop
.complete
[iter
->first
].r
= err
;
1236 assert(rop
.complete
[iter
->first
].r
== 0);
1237 if (!rop
.complete
[iter
->first
].errors
.empty()) {
1238 if (cct
->_conf
->osd_read_ec_check_for_errors
) {
1239 dout(10) << __func__
<< ": Not ignoring errors, use one shard err=" << err
<< dendl
;
1240 err
= rop
.complete
[iter
->first
].errors
.begin()->second
;
1241 rop
.complete
[iter
->first
].r
= err
;
1243 get_parent()->clog_warn() << "Error(s) ignored for "
1244 << iter
->first
<< " enough copies available";
1245 dout(10) << __func__
<< " Error(s) ignored for " << iter
->first
1246 << " enough copies available" << dendl
;
1247 rop
.complete
[iter
->first
].errors
.clear();
1254 if (rop
.in_progress
.empty() || is_complete
== rop
.complete
.size()) {
1255 dout(20) << __func__
<< " Complete: " << rop
<< dendl
;
1256 rop
.trace
.event("ec read complete");
1257 complete_read_op(rop
, m
);
1259 dout(10) << __func__
<< " readop not complete: " << rop
<< dendl
;
1263 void ECBackend::complete_read_op(ReadOp
&rop
, RecoveryMessages
*m
)
1265 map
<hobject_t
, read_request_t
>::iterator reqiter
=
1266 rop
.to_read
.begin();
1267 map
<hobject_t
, read_result_t
>::iterator resiter
=
1268 rop
.complete
.begin();
1269 assert(rop
.to_read
.size() == rop
.complete
.size());
1270 for (; reqiter
!= rop
.to_read
.end(); ++reqiter
, ++resiter
) {
1271 if (reqiter
->second
.cb
) {
1272 pair
<RecoveryMessages
*, read_result_t
&> arg(
1273 m
, resiter
->second
);
1274 reqiter
->second
.cb
->complete(arg
);
1275 reqiter
->second
.cb
= NULL
;
1278 tid_to_read_map
.erase(rop
.tid
);
1281 struct FinishReadOp
: public GenContext
<ThreadPool::TPHandle
&> {
1284 FinishReadOp(ECBackend
*ec
, ceph_tid_t tid
) : ec(ec
), tid(tid
) {}
1285 void finish(ThreadPool::TPHandle
&handle
) override
{
1286 auto ropiter
= ec
->tid_to_read_map
.find(tid
);
1287 assert(ropiter
!= ec
->tid_to_read_map
.end());
1288 int priority
= ropiter
->second
.priority
;
1289 RecoveryMessages rm
;
1290 ec
->complete_read_op(ropiter
->second
, &rm
);
1291 ec
->dispatch_recovery_messages(rm
, priority
);
1295 void ECBackend::filter_read_op(
1296 const OSDMapRef
& osdmap
,
1299 set
<hobject_t
> to_cancel
;
1300 for (map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= op
.source_to_obj
.begin();
1301 i
!= op
.source_to_obj
.end();
1303 if (osdmap
->is_down(i
->first
.osd
)) {
1304 to_cancel
.insert(i
->second
.begin(), i
->second
.end());
1305 op
.in_progress
.erase(i
->first
);
1310 if (to_cancel
.empty())
1313 for (map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= op
.source_to_obj
.begin();
1314 i
!= op
.source_to_obj
.end();
1316 for (set
<hobject_t
>::iterator j
= i
->second
.begin();
1317 j
!= i
->second
.end();
1319 if (to_cancel
.count(*j
))
1320 i
->second
.erase(j
++);
1324 if (i
->second
.empty()) {
1325 op
.source_to_obj
.erase(i
++);
1327 assert(!osdmap
->is_down(i
->first
.osd
));
1332 for (set
<hobject_t
>::iterator i
= to_cancel
.begin();
1333 i
!= to_cancel
.end();
1335 get_parent()->cancel_pull(*i
);
1337 assert(op
.to_read
.count(*i
));
1338 read_request_t
&req
= op
.to_read
.find(*i
)->second
;
1339 dout(10) << __func__
<< ": canceling " << req
1340 << " for obj " << *i
<< dendl
;
1345 op
.to_read
.erase(*i
);
1346 op
.complete
.erase(*i
);
1347 recovery_ops
.erase(*i
);
1350 if (op
.in_progress
.empty()) {
1351 get_parent()->schedule_recovery_work(
1352 get_parent()->bless_gencontext(
1353 new FinishReadOp(this, op
.tid
)));
1357 void ECBackend::check_recovery_sources(const OSDMapRef
& osdmap
)
1359 set
<ceph_tid_t
> tids_to_filter
;
1360 for (map
<pg_shard_t
, set
<ceph_tid_t
> >::iterator
1361 i
= shard_to_read_map
.begin();
1362 i
!= shard_to_read_map
.end();
1364 if (osdmap
->is_down(i
->first
.osd
)) {
1365 tids_to_filter
.insert(i
->second
.begin(), i
->second
.end());
1366 shard_to_read_map
.erase(i
++);
1371 for (set
<ceph_tid_t
>::iterator i
= tids_to_filter
.begin();
1372 i
!= tids_to_filter
.end();
1374 map
<ceph_tid_t
, ReadOp
>::iterator j
= tid_to_read_map
.find(*i
);
1375 assert(j
!= tid_to_read_map
.end());
1376 filter_read_op(osdmap
, j
->second
);
1380 void ECBackend::on_change()
1382 dout(10) << __func__
<< dendl
;
1384 completed_to
= eversion_t();
1385 committed_to
= eversion_t();
1386 pipeline_state
.clear();
1387 waiting_reads
.clear();
1388 waiting_state
.clear();
1389 waiting_commit
.clear();
1390 for (auto &&op
: tid_to_op_map
) {
1391 cache
.release_write_pin(op
.second
.pin
);
1393 tid_to_op_map
.clear();
1395 for (map
<ceph_tid_t
, ReadOp
>::iterator i
= tid_to_read_map
.begin();
1396 i
!= tid_to_read_map
.end();
1398 dout(10) << __func__
<< ": cancelling " << i
->second
<< dendl
;
1399 for (map
<hobject_t
, read_request_t
>::iterator j
=
1400 i
->second
.to_read
.begin();
1401 j
!= i
->second
.to_read
.end();
1403 delete j
->second
.cb
;
1407 tid_to_read_map
.clear();
1408 in_progress_client_reads
.clear();
1409 shard_to_read_map
.clear();
1410 clear_recovery_state();
1413 void ECBackend::clear_recovery_state()
1415 recovery_ops
.clear();
1418 void ECBackend::on_flushed()
1422 void ECBackend::dump_recovery_info(Formatter
*f
) const
1424 f
->open_array_section("recovery_ops");
1425 for (map
<hobject_t
, RecoveryOp
>::const_iterator i
= recovery_ops
.begin();
1426 i
!= recovery_ops
.end();
1428 f
->open_object_section("op");
1433 f
->open_array_section("read_ops");
1434 for (map
<ceph_tid_t
, ReadOp
>::const_iterator i
= tid_to_read_map
.begin();
1435 i
!= tid_to_read_map
.end();
1437 f
->open_object_section("read_op");
1444 void ECBackend::submit_transaction(
1445 const hobject_t
&hoid
,
1446 const object_stat_sum_t
&delta_stats
,
1447 const eversion_t
&at_version
,
1448 PGTransactionUPtr
&&t
,
1449 const eversion_t
&trim_to
,
1450 const eversion_t
&roll_forward_to
,
1451 const vector
<pg_log_entry_t
> &log_entries
,
1452 boost::optional
<pg_hit_set_history_t
> &hset_history
,
1453 Context
*on_local_applied_sync
,
1454 Context
*on_all_applied
,
1455 Context
*on_all_commit
,
1458 OpRequestRef client_op
1461 assert(!tid_to_op_map
.count(tid
));
1462 Op
*op
= &(tid_to_op_map
[tid
]);
1464 op
->delta_stats
= delta_stats
;
1465 op
->version
= at_version
;
1466 op
->trim_to
= trim_to
;
1467 op
->roll_forward_to
= MAX(roll_forward_to
, committed_to
);
1468 op
->log_entries
= log_entries
;
1469 std::swap(op
->updated_hit_set_history
, hset_history
);
1470 op
->on_local_applied_sync
= on_local_applied_sync
;
1471 op
->on_all_applied
= on_all_applied
;
1472 op
->on_all_commit
= on_all_commit
;
1475 op
->client_op
= client_op
;
1477 op
->trace
= client_op
->pg_trace
;
1479 dout(10) << __func__
<< ": op " << *op
<< " starting" << dendl
;
1480 start_rmw(op
, std::move(t
));
1481 dout(10) << "onreadable_sync: " << op
->on_local_applied_sync
<< dendl
;
1484 void ECBackend::call_write_ordered(std::function
<void(void)> &&cb
) {
1485 if (!waiting_state
.empty()) {
1486 waiting_state
.back().on_write
.emplace_back(std::move(cb
));
1487 } else if (!waiting_reads
.empty()) {
1488 waiting_reads
.back().on_write
.emplace_back(std::move(cb
));
1490 // Nothing earlier in the pipeline, just call it
1495 void ECBackend::get_all_avail_shards(
1496 const hobject_t
&hoid
,
1497 const set
<pg_shard_t
> &error_shards
,
1499 map
<shard_id_t
, pg_shard_t
> &shards
,
1502 for (set
<pg_shard_t
>::const_iterator i
=
1503 get_parent()->get_acting_shards().begin();
1504 i
!= get_parent()->get_acting_shards().end();
1506 dout(10) << __func__
<< ": checking acting " << *i
<< dendl
;
1507 const pg_missing_t
&missing
= get_parent()->get_shard_missing(*i
);
1508 if (error_shards
.find(*i
) != error_shards
.end())
1510 if (!missing
.is_missing(hoid
)) {
1511 assert(!have
.count(i
->shard
));
1512 have
.insert(i
->shard
);
1513 assert(!shards
.count(i
->shard
));
1514 shards
.insert(make_pair(i
->shard
, *i
));
1519 for (set
<pg_shard_t
>::const_iterator i
=
1520 get_parent()->get_backfill_shards().begin();
1521 i
!= get_parent()->get_backfill_shards().end();
1523 if (error_shards
.find(*i
) != error_shards
.end())
1525 if (have
.count(i
->shard
)) {
1526 assert(shards
.count(i
->shard
));
1529 dout(10) << __func__
<< ": checking backfill " << *i
<< dendl
;
1530 assert(!shards
.count(i
->shard
));
1531 const pg_info_t
&info
= get_parent()->get_shard_info(*i
);
1532 const pg_missing_t
&missing
= get_parent()->get_shard_missing(*i
);
1533 if (hoid
< info
.last_backfill
&&
1534 !missing
.is_missing(hoid
)) {
1535 have
.insert(i
->shard
);
1536 shards
.insert(make_pair(i
->shard
, *i
));
1540 map
<hobject_t
, set
<pg_shard_t
>>::const_iterator miter
=
1541 get_parent()->get_missing_loc_shards().find(hoid
);
1542 if (miter
!= get_parent()->get_missing_loc_shards().end()) {
1543 for (set
<pg_shard_t
>::iterator i
= miter
->second
.begin();
1544 i
!= miter
->second
.end();
1546 dout(10) << __func__
<< ": checking missing_loc " << *i
<< dendl
;
1547 auto m
= get_parent()->maybe_get_shard_missing(*i
);
1549 assert(!(*m
).is_missing(hoid
));
1551 if (error_shards
.find(*i
) != error_shards
.end())
1553 have
.insert(i
->shard
);
1554 shards
.insert(make_pair(i
->shard
, *i
));
1560 int ECBackend::get_min_avail_to_read_shards(
1561 const hobject_t
&hoid
,
1562 const set
<int> &want
,
1564 bool do_redundant_reads
,
1565 set
<pg_shard_t
> *to_read
)
1567 // Make sure we don't do redundant reads for recovery
1568 assert(!for_recovery
|| !do_redundant_reads
);
1571 map
<shard_id_t
, pg_shard_t
> shards
;
1572 set
<pg_shard_t
> error_shards
;
1574 get_all_avail_shards(hoid
, error_shards
, have
, shards
, for_recovery
);
1577 int r
= ec_impl
->minimum_to_decode(want
, have
, &need
);
1581 if (do_redundant_reads
) {
1588 for (set
<int>::iterator i
= need
.begin();
1591 assert(shards
.count(shard_id_t(*i
)));
1592 to_read
->insert(shards
[shard_id_t(*i
)]);
1597 int ECBackend::get_remaining_shards(
1598 const hobject_t
&hoid
,
1599 const set
<int> &avail
,
1600 const set
<int> &want
,
1601 const read_result_t
&result
,
1602 set
<pg_shard_t
> *to_read
,
1608 map
<shard_id_t
, pg_shard_t
> shards
;
1609 set
<pg_shard_t
> error_shards
;
1610 for (auto &p
: result
.errors
) {
1611 error_shards
.insert(p
.first
);
1614 get_all_avail_shards(hoid
, error_shards
, have
, shards
, for_recovery
);
1617 int r
= ec_impl
->minimum_to_decode(want
, have
, &need
);
1619 dout(0) << __func__
<< " not enough shards left to try for " << hoid
1620 << " read result was " << result
<< dendl
;
1624 set
<int> shards_left
;
1625 for (auto p
: need
) {
1626 if (avail
.find(p
) == avail
.end()) {
1627 shards_left
.insert(p
);
1631 for (set
<int>::iterator i
= shards_left
.begin();
1632 i
!= shards_left
.end();
1634 assert(shards
.count(shard_id_t(*i
)));
1635 assert(avail
.find(*i
) == avail
.end());
1636 to_read
->insert(shards
[shard_id_t(*i
)]);
1641 void ECBackend::start_read_op(
1643 map
<hobject_t
, set
<int>> &want_to_read
,
1644 map
<hobject_t
, read_request_t
> &to_read
,
1646 bool do_redundant_reads
,
1649 ceph_tid_t tid
= get_parent()->get_tid();
1650 assert(!tid_to_read_map
.count(tid
));
1651 auto &op
= tid_to_read_map
.emplace(
1659 std::move(want_to_read
),
1660 std::move(to_read
))).first
->second
;
1661 dout(10) << __func__
<< ": starting " << op
<< dendl
;
1663 op
.trace
= _op
->pg_trace
;
1664 op
.trace
.event("start ec read");
1669 void ECBackend::do_read_op(ReadOp
&op
)
1671 int priority
= op
.priority
;
1672 ceph_tid_t tid
= op
.tid
;
1674 dout(10) << __func__
<< ": starting read " << op
<< dendl
;
1676 map
<pg_shard_t
, ECSubRead
> messages
;
1677 for (map
<hobject_t
, read_request_t
>::iterator i
= op
.to_read
.begin();
1678 i
!= op
.to_read
.end();
1680 bool need_attrs
= i
->second
.want_attrs
;
1681 for (set
<pg_shard_t
>::const_iterator j
= i
->second
.need
.begin();
1682 j
!= i
->second
.need
.end();
1685 messages
[*j
].attrs_to_read
.insert(i
->first
);
1688 op
.obj_to_source
[i
->first
].insert(*j
);
1689 op
.source_to_obj
[*j
].insert(i
->first
);
1691 for (list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >::const_iterator j
=
1692 i
->second
.to_read
.begin();
1693 j
!= i
->second
.to_read
.end();
1695 pair
<uint64_t, uint64_t> chunk_off_len
=
1696 sinfo
.aligned_offset_len_to_chunk(make_pair(j
->get
<0>(), j
->get
<1>()));
1697 for (set
<pg_shard_t
>::const_iterator k
= i
->second
.need
.begin();
1698 k
!= i
->second
.need
.end();
1700 messages
[*k
].to_read
[i
->first
].push_back(
1702 chunk_off_len
.first
,
1703 chunk_off_len
.second
,
1706 assert(!need_attrs
);
1710 for (map
<pg_shard_t
, ECSubRead
>::iterator i
= messages
.begin();
1711 i
!= messages
.end();
1713 op
.in_progress
.insert(i
->first
);
1714 shard_to_read_map
[i
->first
].insert(op
.tid
);
1715 i
->second
.tid
= tid
;
1716 MOSDECSubOpRead
*msg
= new MOSDECSubOpRead
;
1717 msg
->set_priority(priority
);
1719 get_parent()->whoami_spg_t().pgid
,
1721 msg
->map_epoch
= get_parent()->get_epoch();
1722 msg
->min_epoch
= get_parent()->get_interval_start_epoch();
1723 msg
->op
= i
->second
;
1724 msg
->op
.from
= get_parent()->whoami_shard();
1727 // initialize a child span for this shard
1728 msg
->trace
.init("ec sub read", nullptr, &op
.trace
);
1729 msg
->trace
.keyval("shard", i
->first
.shard
.id
);
1731 get_parent()->send_message_osd_cluster(
1734 get_parent()->get_epoch());
1736 dout(10) << __func__
<< ": started " << op
<< dendl
;
1739 ECUtil::HashInfoRef
ECBackend::get_hash_info(
1740 const hobject_t
&hoid
, bool checks
, const map
<string
,bufferptr
> *attrs
)
1742 dout(10) << __func__
<< ": Getting attr on " << hoid
<< dendl
;
1743 ECUtil::HashInfoRef ref
= unstable_hashinfo_registry
.lookup(hoid
);
1745 dout(10) << __func__
<< ": not in cache " << hoid
<< dendl
;
1747 int r
= store
->stat(
1749 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
1751 ECUtil::HashInfo
hinfo(ec_impl
->get_chunk_count());
1752 // XXX: What does it mean if there is no object on disk?
1754 dout(10) << __func__
<< ": found on disk, size " << st
.st_size
<< dendl
;
1757 map
<string
, bufferptr
>::const_iterator k
= attrs
->find(ECUtil::get_hinfo_key());
1758 if (k
== attrs
->end()) {
1759 dout(5) << __func__
<< " " << hoid
<< " missing hinfo attr" << dendl
;
1761 bl
.push_back(k
->second
);
1766 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
1767 ECUtil::get_hinfo_key(),
1770 dout(5) << __func__
<< ": getattr failed: " << cpp_strerror(r
) << dendl
;
1771 bl
.clear(); // just in case
1774 if (bl
.length() > 0) {
1775 bufferlist::iterator bp
= bl
.begin();
1777 ::decode(hinfo
, bp
);
1779 dout(0) << __func__
<< ": Can't decode hinfo for " << hoid
<< dendl
;
1780 return ECUtil::HashInfoRef();
1782 if (checks
&& hinfo
.get_total_chunk_size() != (uint64_t)st
.st_size
) {
1783 dout(0) << __func__
<< ": Mismatch of total_chunk_size "
1784 << hinfo
.get_total_chunk_size() << dendl
;
1785 return ECUtil::HashInfoRef();
1787 } else if (st
.st_size
> 0) { // If empty object and no hinfo, create it
1788 return ECUtil::HashInfoRef();
1791 ref
= unstable_hashinfo_registry
.lookup_or_create(hoid
, hinfo
);
1796 void ECBackend::start_rmw(Op
*op
, PGTransactionUPtr
&&t
)
1800 op
->plan
= ECTransaction::get_write_plan(
1803 [&](const hobject_t
&i
) {
1804 ECUtil::HashInfoRef ref
= get_hash_info(i
, false);
1806 derr
<< __func__
<< ": get_hash_info(" << i
<< ")"
1807 << " returned a null pointer and there is no "
1808 << " way to recover from such an error in this "
1809 << " context" << dendl
;
1814 get_parent()->get_dpp());
1816 dout(10) << __func__
<< ": " << *op
<< dendl
;
1818 waiting_state
.push_back(*op
);
1822 bool ECBackend::try_state_to_reads()
1824 if (waiting_state
.empty())
1827 Op
*op
= &(waiting_state
.front());
1828 if (op
->requires_rmw() && pipeline_state
.cache_invalid()) {
1829 assert(get_parent()->get_pool().allows_ecoverwrites());
1830 dout(20) << __func__
<< ": blocking " << *op
1831 << " because it requires an rmw and the cache is invalid "
1837 if (op
->invalidates_cache()) {
1838 dout(20) << __func__
<< ": invalidating cache after this op"
1840 pipeline_state
.invalidate();
1841 op
->using_cache
= false;
1843 op
->using_cache
= pipeline_state
.caching_enabled();
1846 waiting_state
.pop_front();
1847 waiting_reads
.push_back(*op
);
1849 if (op
->using_cache
) {
1850 cache
.open_write_pin(op
->pin
);
1853 for (auto &&hpair
: op
->plan
.will_write
) {
1854 auto to_read_plan_iter
= op
->plan
.to_read
.find(hpair
.first
);
1855 const extent_set
&to_read_plan
=
1856 to_read_plan_iter
== op
->plan
.to_read
.end() ?
1858 to_read_plan_iter
->second
;
1860 extent_set remote_read
= cache
.reserve_extents_for_rmw(
1866 extent_set pending_read
= to_read_plan
;
1867 pending_read
.subtract(remote_read
);
1869 if (!remote_read
.empty()) {
1870 op
->remote_read
[hpair
.first
] = std::move(remote_read
);
1872 if (!pending_read
.empty()) {
1873 op
->pending_read
[hpair
.first
] = std::move(pending_read
);
1877 op
->remote_read
= op
->plan
.to_read
;
1880 dout(10) << __func__
<< ": " << *op
<< dendl
;
1882 if (!op
->remote_read
.empty()) {
1883 assert(get_parent()->get_pool().allows_ecoverwrites());
1884 objects_read_async_no_cache(
1886 [this, op
](map
<hobject_t
,pair
<int, extent_map
> > &&results
) {
1887 for (auto &&i
: results
) {
1888 op
->remote_read_result
.emplace(i
.first
, i
.second
.second
);
1897 bool ECBackend::try_reads_to_commit()
1899 if (waiting_reads
.empty())
1901 Op
*op
= &(waiting_reads
.front());
1902 if (op
->read_in_progress())
1904 waiting_reads
.pop_front();
1905 waiting_commit
.push_back(*op
);
1907 dout(10) << __func__
<< ": starting commit on " << *op
<< dendl
;
1908 dout(20) << __func__
<< ": " << cache
<< dendl
;
1910 get_parent()->apply_stats(
1914 if (op
->using_cache
) {
1915 for (auto &&hpair
: op
->pending_read
) {
1916 op
->remote_read_result
[hpair
.first
].insert(
1917 cache
.get_remaining_extents_for_rmw(
1922 op
->pending_read
.clear();
1924 assert(op
->pending_read
.empty());
1927 map
<shard_id_t
, ObjectStore::Transaction
> trans
;
1928 for (set
<pg_shard_t
>::const_iterator i
=
1929 get_parent()->get_actingbackfill_shards().begin();
1930 i
!= get_parent()->get_actingbackfill_shards().end();
1935 op
->trace
.event("start ec write");
1937 map
<hobject_t
,extent_map
> written
;
1939 ECTransaction::generate_transactions(
1942 get_parent()->get_info().pgid
.pgid
,
1943 (get_osdmap()->require_osd_release
< CEPH_RELEASE_KRAKEN
),
1945 op
->remote_read_result
,
1950 &(op
->temp_cleared
),
1951 get_parent()->get_dpp());
1954 dout(20) << __func__
<< ": " << cache
<< dendl
;
1955 dout(20) << __func__
<< ": written: " << written
<< dendl
;
1956 dout(20) << __func__
<< ": op: " << *op
<< dendl
;
1958 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1959 for (auto &&i
: op
->log_entries
) {
1960 if (i
.requires_kraken()) {
1961 derr
<< __func__
<< ": log entry " << i
<< " requires kraken"
1962 << " but overwrites are not enabled!" << dendl
;
1968 map
<hobject_t
,extent_set
> written_set
;
1969 for (auto &&i
: written
) {
1970 written_set
[i
.first
] = i
.second
.get_interval_set();
1972 dout(20) << __func__
<< ": written_set: " << written_set
<< dendl
;
1973 assert(written_set
== op
->plan
.will_write
);
1975 if (op
->using_cache
) {
1976 for (auto &&hpair
: written
) {
1977 dout(20) << __func__
<< ": " << hpair
<< dendl
;
1978 cache
.present_rmw_update(hpair
.first
, op
->pin
, hpair
.second
);
1981 op
->remote_read
.clear();
1982 op
->remote_read_result
.clear();
1984 dout(10) << "onreadable_sync: " << op
->on_local_applied_sync
<< dendl
;
1985 ObjectStore::Transaction empty
;
1986 bool should_write_local
= false;
1987 ECSubWrite local_write_op
;
1988 for (set
<pg_shard_t
>::const_iterator i
=
1989 get_parent()->get_actingbackfill_shards().begin();
1990 i
!= get_parent()->get_actingbackfill_shards().end();
1992 op
->pending_apply
.insert(*i
);
1993 op
->pending_commit
.insert(*i
);
1994 map
<shard_id_t
, ObjectStore::Transaction
>::iterator iter
=
1995 trans
.find(i
->shard
);
1996 assert(iter
!= trans
.end());
1997 bool should_send
= get_parent()->should_send_op(*i
, op
->hoid
);
1998 const pg_stat_t
&stats
=
2001 parent
->get_shard_info().find(*i
)->second
.stats
;
2004 get_parent()->whoami_shard(),
2009 should_send
? iter
->second
: empty
,
2012 op
->roll_forward_to
,
2014 op
->updated_hit_set_history
,
2019 ZTracer::Trace trace
;
2021 // initialize a child span for this shard
2022 trace
.init("ec sub write", nullptr, &op
->trace
);
2023 trace
.keyval("shard", i
->shard
.id
);
2026 if (*i
== get_parent()->whoami_shard()) {
2027 should_write_local
= true;
2028 local_write_op
.claim(sop
);
2030 MOSDECSubOpWrite
*r
= new MOSDECSubOpWrite(sop
);
2031 r
->pgid
= spg_t(get_parent()->primary_spg_t().pgid
, i
->shard
);
2032 r
->map_epoch
= get_parent()->get_epoch();
2033 r
->min_epoch
= get_parent()->get_interval_start_epoch();
2035 get_parent()->send_message_osd_cluster(
2036 i
->osd
, r
, get_parent()->get_epoch());
2039 if (should_write_local
) {
2041 get_parent()->whoami_shard(),
2045 op
->on_local_applied_sync
);
2046 op
->on_local_applied_sync
= 0;
2049 for (auto i
= op
->on_write
.begin();
2050 i
!= op
->on_write
.end();
2051 op
->on_write
.erase(i
++)) {
2058 bool ECBackend::try_finish_rmw()
2060 if (waiting_commit
.empty())
2062 Op
*op
= &(waiting_commit
.front());
2063 if (op
->write_in_progress())
2065 waiting_commit
.pop_front();
2067 dout(10) << __func__
<< ": " << *op
<< dendl
;
2068 dout(20) << __func__
<< ": " << cache
<< dendl
;
2070 if (op
->roll_forward_to
> completed_to
)
2071 completed_to
= op
->roll_forward_to
;
2072 if (op
->version
> committed_to
)
2073 committed_to
= op
->version
;
2075 if (get_osdmap()->require_osd_release
>= CEPH_RELEASE_KRAKEN
) {
2076 if (op
->version
> get_parent()->get_log().get_can_rollback_to() &&
2077 waiting_reads
.empty() &&
2078 waiting_commit
.empty()) {
2079 // submit a dummy transaction to kick the rollforward
2080 auto tid
= get_parent()->get_tid();
2081 Op
*nop
= &(tid_to_op_map
[tid
]);
2082 nop
->hoid
= op
->hoid
;
2083 nop
->trim_to
= op
->trim_to
;
2084 nop
->roll_forward_to
= op
->version
;
2086 nop
->reqid
= op
->reqid
;
2087 waiting_reads
.push_back(*nop
);
2091 if (op
->using_cache
) {
2092 cache
.release_write_pin(op
->pin
);
2094 tid_to_op_map
.erase(op
->tid
);
2096 if (waiting_reads
.empty() &&
2097 waiting_commit
.empty()) {
2098 pipeline_state
.clear();
2099 dout(20) << __func__
<< ": clearing pipeline_state "
2106 void ECBackend::check_ops()
2108 while (try_state_to_reads() ||
2109 try_reads_to_commit() ||
2113 int ECBackend::objects_read_sync(
2114 const hobject_t
&hoid
,
2123 void ECBackend::objects_read_async(
2124 const hobject_t
&hoid
,
2125 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2126 pair
<bufferlist
*, Context
*> > > &to_read
,
2127 Context
*on_complete
,
2130 map
<hobject_t
,std::list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > >
2135 for (list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2136 pair
<bufferlist
*, Context
*> > >::const_iterator i
=
2140 pair
<uint64_t, uint64_t> tmp
=
2141 sinfo
.offset_len_to_stripe_bounds(
2142 make_pair(i
->first
.get
<0>(), i
->first
.get
<1>()));
2145 esnew
.insert(tmp
.first
, tmp
.second
);
2147 flags
|= i
->first
.get
<2>();
2151 auto &offsets
= reads
[hoid
];
2152 for (auto j
= es
.begin();
2166 list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2167 pair
<bufferlist
*, Context
*> > > to_read
;
2168 unique_ptr
<Context
> on_complete
;
2169 cb(const cb
&) = delete;
2170 cb(cb
&&) = default;
2172 const hobject_t
&hoid
,
2173 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2174 pair
<bufferlist
*, Context
*> > > &to_read
,
2175 Context
*on_complete
)
2179 on_complete(on_complete
) {}
2180 void operator()(map
<hobject_t
,pair
<int, extent_map
> > &&results
) {
2181 auto dpp
= ec
->get_parent()->get_dpp();
2182 ldpp_dout(dpp
, 20) << "objects_read_async_cb: got: " << results
2184 ldpp_dout(dpp
, 20) << "objects_read_async_cb: cache: " << ec
->cache
2187 auto &got
= results
[hoid
];
2190 for (auto &&read
: to_read
) {
2191 if (got
.first
< 0) {
2192 if (read
.second
.second
) {
2193 read
.second
.second
->complete(got
.first
);
2198 assert(read
.second
.first
);
2199 uint64_t offset
= read
.first
.get
<0>();
2200 uint64_t length
= read
.first
.get
<1>();
2201 auto range
= got
.second
.get_containing_range(offset
, length
);
2202 assert(range
.first
!= range
.second
);
2203 assert(range
.first
.get_off() <= offset
);
2205 (offset
+ length
) <=
2206 (range
.first
.get_off() + range
.first
.get_len()));
2207 read
.second
.first
->substr_of(
2208 range
.first
.get_val(),
2209 offset
- range
.first
.get_off(),
2211 if (read
.second
.second
) {
2212 read
.second
.second
->complete(length
);
2213 read
.second
.second
= nullptr;
2219 on_complete
.release()->complete(r
);
2223 for (auto &&i
: to_read
) {
2224 delete i
.second
.second
;
2229 objects_read_and_reconstruct(
2232 make_gen_lambda_context
<
2233 map
<hobject_t
,pair
<int, extent_map
> > &&, cb
>(
2240 struct CallClientContexts
:
2241 public GenContext
<pair
<RecoveryMessages
*, ECBackend::read_result_t
& > &> {
2244 ECBackend::ClientAsyncReadStatus
*status
;
2245 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > to_read
;
2249 ECBackend::ClientAsyncReadStatus
*status
,
2250 const list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > &to_read
)
2251 : hoid(hoid
), ec(ec
), status(status
), to_read(to_read
) {}
2252 void finish(pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
) override
{
2253 ECBackend::read_result_t
&res
= in
.second
;
2257 assert(res
.returned
.size() == to_read
.size());
2259 assert(res
.errors
.empty());
2260 for (auto &&read
: to_read
) {
2261 pair
<uint64_t, uint64_t> adjusted
=
2262 ec
->sinfo
.offset_len_to_stripe_bounds(
2263 make_pair(read
.get
<0>(), read
.get
<1>()));
2264 assert(res
.returned
.front().get
<0>() == adjusted
.first
&&
2265 res
.returned
.front().get
<1>() == adjusted
.second
);
2266 map
<int, bufferlist
> to_decode
;
2268 for (map
<pg_shard_t
, bufferlist
>::iterator j
=
2269 res
.returned
.front().get
<2>().begin();
2270 j
!= res
.returned
.front().get
<2>().end();
2272 to_decode
[j
->first
.shard
].claim(j
->second
);
2274 int r
= ECUtil::decode(
2286 read
.get
<0>() - adjusted
.first
,
2288 bl
.length() - (read
.get
<0>() - adjusted
.first
)));
2290 read
.get
<0>(), trimmed
.length(), std::move(trimmed
));
2291 res
.returned
.pop_front();
2294 status
->complete_object(hoid
, res
.r
, std::move(result
));
2299 void ECBackend::objects_read_and_reconstruct(
2300 const map
<hobject_t
,
2301 std::list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >
2304 GenContextURef
<map
<hobject_t
,pair
<int, extent_map
> > &&> &&func
)
2306 in_progress_client_reads
.emplace_back(
2307 reads
.size(), std::move(func
));
2308 if (!reads
.size()) {
2313 map
<hobject_t
, set
<int>> obj_want_to_read
;
2314 set
<int> want_to_read
;
2315 get_want_to_read_shards(&want_to_read
);
2317 map
<hobject_t
, read_request_t
> for_read_op
;
2318 for (auto &&to_read
: reads
) {
2319 set
<pg_shard_t
> shards
;
2320 int r
= get_min_avail_to_read_shards(
2328 CallClientContexts
*c
= new CallClientContexts(
2331 &(in_progress_client_reads
.back()),
2341 obj_want_to_read
.insert(make_pair(to_read
.first
, want_to_read
));
2345 CEPH_MSG_PRIO_DEFAULT
,
2354 int ECBackend::send_all_remaining_reads(
2355 const hobject_t
&hoid
,
2358 set
<int> already_read
;
2359 const set
<pg_shard_t
>& ots
= rop
.obj_to_source
[hoid
];
2360 for (set
<pg_shard_t
>::iterator i
= ots
.begin(); i
!= ots
.end(); ++i
)
2361 already_read
.insert(i
->shard
);
2362 dout(10) << __func__
<< " have/error shards=" << already_read
<< dendl
;
2363 set
<pg_shard_t
> shards
;
2364 int r
= get_remaining_shards(hoid
, already_read
, rop
.want_to_read
[hoid
],
2365 rop
.complete
[hoid
], &shards
, rop
.for_recovery
);
2369 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > offsets
=
2370 rop
.to_read
.find(hoid
)->second
.to_read
;
2371 GenContext
<pair
<RecoveryMessages
*, read_result_t
& > &> *c
=
2372 rop
.to_read
.find(hoid
)->second
.cb
;
2374 rop
.to_read
.erase(hoid
);
2375 rop
.to_read
.insert(make_pair(
2386 int ECBackend::objects_get_attrs(
2387 const hobject_t
&hoid
,
2388 map
<string
, bufferlist
> *out
)
2390 int r
= store
->getattrs(
2392 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
2397 for (map
<string
, bufferlist
>::iterator i
= out
->begin();
2400 if (ECUtil::is_hinfo_key_string(i
->first
))
2408 void ECBackend::rollback_append(
2409 const hobject_t
&hoid
,
2411 ObjectStore::Transaction
*t
)
2413 assert(old_size
% sinfo
.get_stripe_width() == 0);
2416 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
2417 sinfo
.aligned_logical_offset_to_chunk_offset(
2421 int ECBackend::be_deep_scrub(
2422 const hobject_t
&poid
,
2424 ScrubMapBuilder
&pos
,
2425 ScrubMap::object
&o
)
2427 dout(10) << __func__
<< " " << poid
<< " pos " << pos
<< dendl
;
2430 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
|
2431 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
2434 sleeptime
.set_from_double(cct
->_conf
->osd_debug_deep_scrub_sleep
);
2435 if (sleeptime
!= utime_t()) {
2436 lgeneric_derr(cct
) << __func__
<< " sleeping for " << sleeptime
<< dendl
;
2440 if (pos
.data_pos
== 0) {
2441 pos
.data_hash
= bufferhash(-1);
2444 uint64_t stride
= cct
->_conf
->osd_deep_scrub_stride
;
2445 if (stride
% sinfo
.get_chunk_size())
2446 stride
+= sinfo
.get_chunk_size() - (stride
% sinfo
.get_chunk_size());
2452 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
2457 dout(20) << __func__
<< " " << poid
<< " got "
2458 << r
<< " on read, read_error" << dendl
;
2459 o
.read_error
= true;
2462 if (bl
.length() % sinfo
.get_chunk_size()) {
2463 dout(20) << __func__
<< " " << poid
<< " got "
2464 << r
<< " on read, not chunk size " << sinfo
.get_chunk_size() << " aligned"
2466 o
.read_error
= true;
2470 pos
.data_hash
<< bl
;
2473 if (r
== (int)stride
) {
2474 return -EINPROGRESS
;
2477 ECUtil::HashInfoRef hinfo
= get_hash_info(poid
, false, &o
.attrs
);
2479 dout(0) << "_scan_list " << poid
<< " could not retrieve hash info" << dendl
;
2480 o
.read_error
= true;
2481 o
.digest_present
= false;
2484 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2485 assert(hinfo
->has_chunk_hash());
2486 if (hinfo
->get_total_chunk_size() != (unsigned)pos
.data_pos
) {
2487 dout(0) << "_scan_list " << poid
<< " got incorrect size on read 0x"
2489 << " expected 0x" << hinfo
->get_total_chunk_size() << std::dec
2491 o
.ec_size_mismatch
= true;
2495 if (hinfo
->get_chunk_hash(get_parent()->whoami_shard().shard
) !=
2496 pos
.data_hash
.digest()) {
2497 dout(0) << "_scan_list " << poid
<< " got incorrect hash on read 0x"
2498 << std::hex
<< pos
.data_hash
.digest() << " != expected 0x"
2499 << hinfo
->get_chunk_hash(get_parent()->whoami_shard().shard
)
2500 << std::dec
<< dendl
;
2501 o
.ec_hash_mismatch
= true;
2505 /* We checked above that we match our own stored hash. We cannot
2506 * send a hash of the actual object, so instead we simply send
2507 * our locally stored hash of shard 0 on the assumption that if
2508 * we match our chunk hash and our recollection of the hash for
2509 * chunk 0 matches that of our peers, there is likely no corruption.
2511 o
.digest
= hinfo
->get_chunk_hash(0);
2512 o
.digest_present
= true;
2514 /* Hack! We must be using partial overwrites, and partial overwrites
2515 * don't support deep-scrub yet
2518 o
.digest_present
= true;
2523 o
.omap_digest_present
= true;