1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include "ECBackend.h"
19 #include "messages/MOSDPGPush.h"
20 #include "messages/MOSDPGPushReply.h"
21 #include "messages/MOSDECSubOpWrite.h"
22 #include "messages/MOSDECSubOpWriteReply.h"
23 #include "messages/MOSDECSubOpRead.h"
24 #include "messages/MOSDECSubOpReadReply.h"
25 #include "ECMsgTypes.h"
27 #include "PrimaryLogPG.h"
29 #define dout_context cct
30 #define dout_subsys ceph_subsys_osd
31 #define DOUT_PREFIX_ARGS this
33 #define dout_prefix _prefix(_dout, this)
34 static ostream
& _prefix(std::ostream
*_dout
, ECBackend
*pgb
) {
35 return *_dout
<< pgb
->get_parent()->gen_dbg_prefix();
38 struct ECRecoveryHandle
: public PGBackend::RecoveryHandle
{
39 list
<ECBackend::RecoveryOp
> ops
;
42 ostream
&operator<<(ostream
&lhs
, const ECBackend::pipeline_state_t
&rhs
) {
43 switch (rhs
.pipeline_state
) {
44 case ECBackend::pipeline_state_t::CACHE_VALID
:
45 return lhs
<< "CACHE_VALID";
46 case ECBackend::pipeline_state_t::CACHE_INVALID
:
47 return lhs
<< "CACHE_INVALID";
49 assert(0 == "invalid pipeline state");
51 return lhs
; // unreachable
54 static ostream
&operator<<(ostream
&lhs
, const map
<pg_shard_t
, bufferlist
> &rhs
)
57 for (map
<pg_shard_t
, bufferlist
>::const_iterator i
= rhs
.begin();
62 lhs
<< make_pair(i
->first
, i
->second
.length());
67 static ostream
&operator<<(ostream
&lhs
, const map
<int, bufferlist
> &rhs
)
70 for (map
<int, bufferlist
>::const_iterator i
= rhs
.begin();
75 lhs
<< make_pair(i
->first
, i
->second
.length());
80 static ostream
&operator<<(
82 const boost::tuple
<uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > &rhs
)
84 return lhs
<< "(" << rhs
.get
<0>() << ", "
85 << rhs
.get
<1>() << ", " << rhs
.get
<2>() << ")";
88 ostream
&operator<<(ostream
&lhs
, const ECBackend::read_request_t
&rhs
)
90 return lhs
<< "read_request_t(to_read=[" << rhs
.to_read
<< "]"
91 << ", need=" << rhs
.need
92 << ", want_attrs=" << rhs
.want_attrs
96 ostream
&operator<<(ostream
&lhs
, const ECBackend::read_result_t
&rhs
)
98 lhs
<< "read_result_t(r=" << rhs
.r
99 << ", errors=" << rhs
.errors
;
101 lhs
<< ", attrs=" << rhs
.attrs
.get();
105 return lhs
<< ", returned=" << rhs
.returned
<< ")";
108 ostream
&operator<<(ostream
&lhs
, const ECBackend::ReadOp
&rhs
)
110 lhs
<< "ReadOp(tid=" << rhs
.tid
;
111 if (rhs
.op
&& rhs
.op
->get_req()) {
113 rhs
.op
->get_req()->print(lhs
);
115 return lhs
<< ", to_read=" << rhs
.to_read
116 << ", complete=" << rhs
.complete
117 << ", priority=" << rhs
.priority
118 << ", obj_to_source=" << rhs
.obj_to_source
119 << ", source_to_obj=" << rhs
.source_to_obj
120 << ", in_progress=" << rhs
.in_progress
<< ")";
123 void ECBackend::ReadOp::dump(Formatter
*f
) const
125 f
->dump_unsigned("tid", tid
);
126 if (op
&& op
->get_req()) {
127 f
->dump_stream("op") << *(op
->get_req());
129 f
->dump_stream("to_read") << to_read
;
130 f
->dump_stream("complete") << complete
;
131 f
->dump_int("priority", priority
);
132 f
->dump_stream("obj_to_source") << obj_to_source
;
133 f
->dump_stream("source_to_obj") << source_to_obj
;
134 f
->dump_stream("in_progress") << in_progress
;
137 ostream
&operator<<(ostream
&lhs
, const ECBackend::Op
&rhs
)
139 lhs
<< "Op(" << rhs
.hoid
140 << " v=" << rhs
.version
141 << " tt=" << rhs
.trim_to
142 << " tid=" << rhs
.tid
143 << " reqid=" << rhs
.reqid
;
144 if (rhs
.client_op
&& rhs
.client_op
->get_req()) {
145 lhs
<< " client_op=";
146 rhs
.client_op
->get_req()->print(lhs
);
148 lhs
<< " roll_forward_to=" << rhs
.roll_forward_to
149 << " temp_added=" << rhs
.temp_added
150 << " temp_cleared=" << rhs
.temp_cleared
151 << " pending_read=" << rhs
.pending_read
152 << " remote_read=" << rhs
.remote_read
153 << " remote_read_result=" << rhs
.remote_read_result
154 << " pending_apply=" << rhs
.pending_apply
155 << " pending_commit=" << rhs
.pending_commit
156 << " plan.to_read=" << rhs
.plan
.to_read
157 << " plan.will_write=" << rhs
.plan
.will_write
162 ostream
&operator<<(ostream
&lhs
, const ECBackend::RecoveryOp
&rhs
)
164 return lhs
<< "RecoveryOp("
165 << "hoid=" << rhs
.hoid
167 << " missing_on=" << rhs
.missing_on
168 << " missing_on_shards=" << rhs
.missing_on_shards
169 << " recovery_info=" << rhs
.recovery_info
170 << " recovery_progress=" << rhs
.recovery_progress
171 << " obc refcount=" << rhs
.obc
.use_count()
172 << " state=" << ECBackend::RecoveryOp::tostr(rhs
.state
)
173 << " waiting_on_pushes=" << rhs
.waiting_on_pushes
174 << " extent_requested=" << rhs
.extent_requested
178 void ECBackend::RecoveryOp::dump(Formatter
*f
) const
180 f
->dump_stream("hoid") << hoid
;
181 f
->dump_stream("v") << v
;
182 f
->dump_stream("missing_on") << missing_on
;
183 f
->dump_stream("missing_on_shards") << missing_on_shards
;
184 f
->dump_stream("recovery_info") << recovery_info
;
185 f
->dump_stream("recovery_progress") << recovery_progress
;
186 f
->dump_stream("state") << tostr(state
);
187 f
->dump_stream("waiting_on_pushes") << waiting_on_pushes
;
188 f
->dump_stream("extent_requested") << extent_requested
;
191 ECBackend::ECBackend(
192 PGBackend::Listener
*pg
,
194 ObjectStore::CollectionHandle
&ch
,
197 ErasureCodeInterfaceRef ec_impl
,
198 uint64_t stripe_width
)
199 : PGBackend(cct
, pg
, store
, coll
, ch
),
201 sinfo(ec_impl
->get_data_chunk_count(), stripe_width
) {
202 assert((ec_impl
->get_data_chunk_count() *
203 ec_impl
->get_chunk_size(stripe_width
)) == stripe_width
);
206 PGBackend::RecoveryHandle
*ECBackend::open_recovery_op()
208 return new ECRecoveryHandle
;
211 void ECBackend::_failed_push(const hobject_t
&hoid
,
212 pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
)
214 ECBackend::read_result_t
&res
= in
.second
;
215 dout(10) << __func__
<< ": Read error " << hoid
<< " r="
216 << res
.r
<< " errors=" << res
.errors
<< dendl
;
217 dout(10) << __func__
<< ": canceling recovery op for obj " << hoid
219 assert(recovery_ops
.count(hoid
));
220 recovery_ops
.erase(hoid
);
223 for (auto&& i
: res
.errors
) {
224 fl
.push_back(i
.first
);
226 get_parent()->failed_push(fl
, hoid
);
229 struct OnRecoveryReadComplete
:
230 public GenContext
<pair
<RecoveryMessages
*, ECBackend::read_result_t
& > &> {
234 OnRecoveryReadComplete(ECBackend
*pg
, const hobject_t
&hoid
)
235 : pg(pg
), hoid(hoid
) {}
236 void finish(pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
) override
{
237 ECBackend::read_result_t
&res
= in
.second
;
238 if (!(res
.r
== 0 && res
.errors
.empty())) {
239 pg
->_failed_push(hoid
, in
);
242 assert(res
.returned
.size() == 1);
243 pg
->handle_recovery_read_complete(
251 struct RecoveryMessages
{
253 ECBackend::read_request_t
> reads
;
256 const hobject_t
&hoid
, uint64_t off
, uint64_t len
,
257 const set
<pg_shard_t
> &need
,
259 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > to_read
;
260 to_read
.push_back(boost::make_tuple(off
, len
, 0));
261 assert(!reads
.count(hoid
));
265 ECBackend::read_request_t(
269 new OnRecoveryReadComplete(
274 map
<pg_shard_t
, vector
<PushOp
> > pushes
;
275 map
<pg_shard_t
, vector
<PushReplyOp
> > push_replies
;
276 ObjectStore::Transaction t
;
277 RecoveryMessages() {}
278 ~RecoveryMessages(){}
281 void ECBackend::handle_recovery_push(
286 if (get_parent()->check_failsafe_full(ss
)) {
287 dout(10) << __func__
<< " Out of space (failsafe) processing push request: " << ss
.str() << dendl
;
291 bool oneshot
= op
.before_progress
.first
&& op
.after_progress
.data_complete
;
294 tobj
= ghobject_t(op
.soid
, ghobject_t::NO_GEN
,
295 get_parent()->whoami_shard().shard
);
297 tobj
= ghobject_t(get_parent()->get_temp_recovery_object(op
.soid
,
300 get_parent()->whoami_shard().shard
);
301 if (op
.before_progress
.first
) {
302 dout(10) << __func__
<< ": Adding oid "
303 << tobj
.hobj
<< " in the temp collection" << dendl
;
304 add_temp_obj(tobj
.hobj
);
308 if (op
.before_progress
.first
) {
309 m
->t
.remove(coll
, tobj
);
310 m
->t
.touch(coll
, tobj
);
313 if (!op
.data_included
.empty()) {
314 uint64_t start
= op
.data_included
.range_start();
315 uint64_t end
= op
.data_included
.range_end();
316 assert(op
.data
.length() == (end
- start
));
325 assert(op
.data
.length() == 0);
328 if (op
.before_progress
.first
) {
329 assert(op
.attrset
.count(string("_")));
336 if (op
.after_progress
.data_complete
&& !oneshot
) {
337 dout(10) << __func__
<< ": Removing oid "
338 << tobj
.hobj
<< " from the temp collection" << dendl
;
339 clear_temp_obj(tobj
.hobj
);
340 m
->t
.remove(coll
, ghobject_t(
341 op
.soid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
342 m
->t
.collection_move_rename(
345 op
.soid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
347 if (op
.after_progress
.data_complete
) {
348 if ((get_parent()->pgb_is_primary())) {
349 assert(recovery_ops
.count(op
.soid
));
350 assert(recovery_ops
[op
.soid
].obc
);
351 get_parent()->on_local_recover(
354 recovery_ops
[op
.soid
].obc
,
357 get_parent()->on_local_recover(
364 m
->push_replies
[get_parent()->primary_shard()].push_back(PushReplyOp());
365 m
->push_replies
[get_parent()->primary_shard()].back().soid
= op
.soid
;
368 void ECBackend::handle_recovery_push_reply(
369 const PushReplyOp
&op
,
373 if (!recovery_ops
.count(op
.soid
))
375 RecoveryOp
&rop
= recovery_ops
[op
.soid
];
376 assert(rop
.waiting_on_pushes
.count(from
));
377 rop
.waiting_on_pushes
.erase(from
);
378 continue_recovery_op(rop
, m
);
381 void ECBackend::handle_recovery_read_complete(
382 const hobject_t
&hoid
,
383 boost::tuple
<uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > &to_read
,
384 boost::optional
<map
<string
, bufferlist
> > attrs
,
387 dout(10) << __func__
<< ": returned " << hoid
<< " "
388 << "(" << to_read
.get
<0>()
389 << ", " << to_read
.get
<1>()
390 << ", " << to_read
.get
<2>()
393 assert(recovery_ops
.count(hoid
));
394 RecoveryOp
&op
= recovery_ops
[hoid
];
395 assert(op
.returned_data
.empty());
396 map
<int, bufferlist
*> target
;
397 for (set
<shard_id_t
>::iterator i
= op
.missing_on_shards
.begin();
398 i
!= op
.missing_on_shards
.end();
400 target
[*i
] = &(op
.returned_data
[*i
]);
402 map
<int, bufferlist
> from
;
403 for(map
<pg_shard_t
, bufferlist
>::iterator i
= to_read
.get
<2>().begin();
404 i
!= to_read
.get
<2>().end();
406 from
[i
->first
.shard
].claim(i
->second
);
408 dout(10) << __func__
<< ": " << from
<< dendl
;
409 int r
= ECUtil::decode(sinfo
, ec_impl
, from
, target
);
412 op
.xattrs
.swap(*attrs
);
415 // attrs only reference the origin bufferlist (decode from
416 // ECSubReadReply message) whose size is much greater than attrs
417 // in recovery. If obc cache it (get_obc maybe cache the attr),
418 // this causes the whole origin bufferlist would not be free
419 // until obc is evicted from obc cache. So rebuild the
420 // bufferlist before cache it.
421 for (map
<string
, bufferlist
>::iterator it
= op
.xattrs
.begin();
422 it
!= op
.xattrs
.end();
424 it
->second
.rebuild();
426 // Need to remove ECUtil::get_hinfo_key() since it should not leak out
427 // of the backend (see bug #12983)
428 map
<string
, bufferlist
> sanitized_attrs(op
.xattrs
);
429 sanitized_attrs
.erase(ECUtil::get_hinfo_key());
430 op
.obc
= get_parent()->get_obc(hoid
, sanitized_attrs
);
432 op
.recovery_info
.size
= op
.obc
->obs
.oi
.size
;
433 op
.recovery_info
.oi
= op
.obc
->obs
.oi
;
436 ECUtil::HashInfo
hinfo(ec_impl
->get_chunk_count());
437 if (op
.obc
->obs
.oi
.size
> 0) {
438 assert(op
.xattrs
.count(ECUtil::get_hinfo_key()));
439 bufferlist::iterator bp
= op
.xattrs
[ECUtil::get_hinfo_key()].begin();
442 op
.hinfo
= unstable_hashinfo_registry
.lookup_or_create(hoid
, hinfo
);
444 assert(op
.xattrs
.size());
446 continue_recovery_op(op
, m
);
449 struct SendPushReplies
: public Context
{
450 PGBackend::Listener
*l
;
452 map
<int, MOSDPGPushReply
*> replies
;
454 PGBackend::Listener
*l
,
456 map
<int, MOSDPGPushReply
*> &in
) : l(l
), epoch(epoch
) {
459 void finish(int) override
{
460 for (map
<int, MOSDPGPushReply
*>::iterator i
= replies
.begin();
463 l
->send_message_osd_cluster(i
->first
, i
->second
, epoch
);
467 ~SendPushReplies() override
{
468 for (map
<int, MOSDPGPushReply
*>::iterator i
= replies
.begin();
477 void ECBackend::dispatch_recovery_messages(RecoveryMessages
&m
, int priority
)
479 for (map
<pg_shard_t
, vector
<PushOp
> >::iterator i
= m
.pushes
.begin();
481 m
.pushes
.erase(i
++)) {
482 MOSDPGPush
*msg
= new MOSDPGPush();
483 msg
->set_priority(priority
);
484 msg
->map_epoch
= get_parent()->get_epoch();
485 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
486 msg
->from
= get_parent()->whoami_shard();
487 msg
->pgid
= spg_t(get_parent()->get_info().pgid
.pgid
, i
->first
.shard
);
488 msg
->pushes
.swap(i
->second
);
489 msg
->compute_cost(cct
);
490 get_parent()->send_message(
494 map
<int, MOSDPGPushReply
*> replies
;
495 for (map
<pg_shard_t
, vector
<PushReplyOp
> >::iterator i
=
496 m
.push_replies
.begin();
497 i
!= m
.push_replies
.end();
498 m
.push_replies
.erase(i
++)) {
499 MOSDPGPushReply
*msg
= new MOSDPGPushReply();
500 msg
->set_priority(priority
);
501 msg
->map_epoch
= get_parent()->get_epoch();
502 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
503 msg
->from
= get_parent()->whoami_shard();
504 msg
->pgid
= spg_t(get_parent()->get_info().pgid
.pgid
, i
->first
.shard
);
505 msg
->replies
.swap(i
->second
);
506 msg
->compute_cost(cct
);
507 replies
.insert(make_pair(i
->first
.osd
, msg
));
510 if (!replies
.empty()) {
511 (m
.t
).register_on_complete(
512 get_parent()->bless_context(
515 get_parent()->get_epoch(),
517 get_parent()->queue_transaction(std::move(m
.t
));
529 void ECBackend::continue_recovery_op(
533 dout(10) << __func__
<< ": continuing " << op
<< dendl
;
536 case RecoveryOp::IDLE
: {
538 op
.state
= RecoveryOp::READING
;
539 assert(!op
.recovery_progress
.data_complete
);
540 set
<int> want(op
.missing_on_shards
.begin(), op
.missing_on_shards
.end());
541 uint64_t from
= op
.recovery_progress
.data_recovered_to
;
542 uint64_t amount
= get_recovery_chunk_size();
544 if (op
.recovery_progress
.first
&& op
.obc
) {
545 /* We've got the attrs and the hinfo, might as well use them */
546 op
.hinfo
= get_hash_info(op
.hoid
);
548 op
.xattrs
= op
.obc
->attr_cache
;
549 ::encode(*(op
.hinfo
), op
.xattrs
[ECUtil::get_hinfo_key()]);
552 set
<pg_shard_t
> to_read
;
553 int r
= get_min_avail_to_read_shards(
554 op
.hoid
, want
, true, false, &to_read
);
556 // we must have lost a recovery source
557 assert(!op
.recovery_progress
.first
);
558 dout(10) << __func__
<< ": canceling recovery op for obj " << op
.hoid
560 get_parent()->cancel_pull(op
.hoid
);
561 recovery_ops
.erase(op
.hoid
);
567 op
.recovery_progress
.data_recovered_to
,
570 op
.recovery_progress
.first
&& !op
.obc
);
571 op
.extent_requested
= make_pair(
574 dout(10) << __func__
<< ": IDLE return " << op
<< dendl
;
577 case RecoveryOp::READING
: {
578 // read completed, start write
579 assert(op
.xattrs
.size());
580 assert(op
.returned_data
.size());
581 op
.state
= RecoveryOp::WRITING
;
582 ObjectRecoveryProgress after_progress
= op
.recovery_progress
;
583 after_progress
.data_recovered_to
+= op
.extent_requested
.second
;
584 after_progress
.first
= false;
585 if (after_progress
.data_recovered_to
>= op
.obc
->obs
.oi
.size
) {
586 after_progress
.data_recovered_to
=
587 sinfo
.logical_to_next_stripe_offset(
588 op
.obc
->obs
.oi
.size
);
589 after_progress
.data_complete
= true;
591 for (set
<pg_shard_t
>::iterator mi
= op
.missing_on
.begin();
592 mi
!= op
.missing_on
.end();
594 assert(op
.returned_data
.count(mi
->shard
));
595 m
->pushes
[*mi
].push_back(PushOp());
596 PushOp
&pop
= m
->pushes
[*mi
].back();
599 pop
.data
= op
.returned_data
[mi
->shard
];
600 dout(10) << __func__
<< ": before_progress=" << op
.recovery_progress
601 << ", after_progress=" << after_progress
602 << ", pop.data.length()=" << pop
.data
.length()
603 << ", size=" << op
.obc
->obs
.oi
.size
<< dendl
;
606 sinfo
.aligned_logical_offset_to_chunk_offset(
607 after_progress
.data_recovered_to
-
608 op
.recovery_progress
.data_recovered_to
)
610 if (pop
.data
.length())
611 pop
.data_included
.insert(
612 sinfo
.aligned_logical_offset_to_chunk_offset(
613 op
.recovery_progress
.data_recovered_to
),
616 if (op
.recovery_progress
.first
) {
617 pop
.attrset
= op
.xattrs
;
619 pop
.recovery_info
= op
.recovery_info
;
620 pop
.before_progress
= op
.recovery_progress
;
621 pop
.after_progress
= after_progress
;
622 if (*mi
!= get_parent()->primary_shard())
623 get_parent()->begin_peer_recover(
627 op
.returned_data
.clear();
628 op
.waiting_on_pushes
= op
.missing_on
;
629 op
.recovery_progress
= after_progress
;
630 dout(10) << __func__
<< ": READING return " << op
<< dendl
;
633 case RecoveryOp::WRITING
: {
634 if (op
.waiting_on_pushes
.empty()) {
635 if (op
.recovery_progress
.data_complete
) {
636 op
.state
= RecoveryOp::COMPLETE
;
637 for (set
<pg_shard_t
>::iterator i
= op
.missing_on
.begin();
638 i
!= op
.missing_on
.end();
640 if (*i
!= get_parent()->primary_shard()) {
641 dout(10) << __func__
<< ": on_peer_recover on " << *i
642 << ", obj " << op
.hoid
<< dendl
;
643 get_parent()->on_peer_recover(
649 object_stat_sum_t stat
;
650 stat
.num_bytes_recovered
= op
.recovery_info
.size
;
651 stat
.num_keys_recovered
= 0; // ??? op ... omap_entries.size(); ?
652 stat
.num_objects_recovered
= 1;
653 get_parent()->on_global_recover(op
.hoid
, stat
);
654 dout(10) << __func__
<< ": WRITING return " << op
<< dendl
;
655 recovery_ops
.erase(op
.hoid
);
658 op
.state
= RecoveryOp::IDLE
;
659 dout(10) << __func__
<< ": WRITING continue " << op
<< dendl
;
665 // should never be called once complete
666 case RecoveryOp::COMPLETE
:
674 void ECBackend::run_recovery_op(
678 ECRecoveryHandle
*h
= static_cast<ECRecoveryHandle
*>(_h
);
680 for (list
<RecoveryOp
>::iterator i
= h
->ops
.begin();
683 dout(10) << __func__
<< ": starting " << *i
<< dendl
;
684 assert(!recovery_ops
.count(i
->hoid
));
685 RecoveryOp
&op
= recovery_ops
.insert(make_pair(i
->hoid
, *i
)).first
->second
;
686 continue_recovery_op(op
, &m
);
688 dispatch_recovery_messages(m
, priority
);
692 void ECBackend::recover_object(
693 const hobject_t
&hoid
,
695 ObjectContextRef head
,
696 ObjectContextRef obc
,
699 ECRecoveryHandle
*h
= static_cast<ECRecoveryHandle
*>(_h
);
700 h
->ops
.push_back(RecoveryOp());
702 h
->ops
.back().hoid
= hoid
;
703 h
->ops
.back().obc
= obc
;
704 h
->ops
.back().recovery_info
.soid
= hoid
;
705 h
->ops
.back().recovery_info
.version
= v
;
707 h
->ops
.back().recovery_info
.size
= obc
->obs
.oi
.size
;
708 h
->ops
.back().recovery_info
.oi
= obc
->obs
.oi
;
710 if (hoid
.is_snap()) {
713 h
->ops
.back().recovery_info
.ss
= obc
->ssc
->snapset
;
716 h
->ops
.back().recovery_info
.ss
= head
->ssc
->snapset
;
718 assert(0 == "neither obc nor head set for a snap object");
721 h
->ops
.back().recovery_progress
.omap_complete
= true;
722 for (set
<pg_shard_t
>::const_iterator i
=
723 get_parent()->get_actingbackfill_shards().begin();
724 i
!= get_parent()->get_actingbackfill_shards().end();
726 dout(10) << "checking " << *i
<< dendl
;
727 if (get_parent()->get_shard_missing(*i
).is_missing(hoid
)) {
728 h
->ops
.back().missing_on
.insert(*i
);
729 h
->ops
.back().missing_on_shards
.insert(i
->shard
);
732 dout(10) << __func__
<< ": built op " << h
->ops
.back() << dendl
;
735 bool ECBackend::can_handle_while_inactive(
741 bool ECBackend::handle_message(
744 dout(10) << __func__
<< ": " << *_op
->get_req() << dendl
;
745 int priority
= _op
->get_req()->get_priority();
746 switch (_op
->get_req()->get_type()) {
747 case MSG_OSD_EC_WRITE
: {
748 // NOTE: this is non-const because handle_sub_write modifies the embedded
749 // ObjectStore::Transaction in place (and then std::move's it). It does
750 // not conflict with ECSubWrite's operator<<.
751 MOSDECSubOpWrite
*op
= static_cast<MOSDECSubOpWrite
*>(
752 _op
->get_nonconst_req());
753 handle_sub_write(op
->op
.from
, _op
, op
->op
, _op
->pg_trace
);
756 case MSG_OSD_EC_WRITE_REPLY
: {
757 const MOSDECSubOpWriteReply
*op
= static_cast<const MOSDECSubOpWriteReply
*>(
759 handle_sub_write_reply(op
->op
.from
, op
->op
, _op
->pg_trace
);
762 case MSG_OSD_EC_READ
: {
763 const MOSDECSubOpRead
*op
= static_cast<const MOSDECSubOpRead
*>(_op
->get_req());
764 MOSDECSubOpReadReply
*reply
= new MOSDECSubOpReadReply
;
765 reply
->pgid
= get_parent()->primary_spg_t();
766 reply
->map_epoch
= get_parent()->get_epoch();
767 reply
->min_epoch
= get_parent()->get_interval_start_epoch();
768 handle_sub_read(op
->op
.from
, op
->op
, &(reply
->op
), _op
->pg_trace
);
769 reply
->trace
= _op
->pg_trace
;
770 get_parent()->send_message_osd_cluster(
771 op
->op
.from
.osd
, reply
, get_parent()->get_epoch());
774 case MSG_OSD_EC_READ_REPLY
: {
775 // NOTE: this is non-const because handle_sub_read_reply steals resulting
776 // buffers. It does not conflict with ECSubReadReply operator<<.
777 MOSDECSubOpReadReply
*op
= static_cast<MOSDECSubOpReadReply
*>(
778 _op
->get_nonconst_req());
780 handle_sub_read_reply(op
->op
.from
, op
->op
, &rm
, _op
->pg_trace
);
781 dispatch_recovery_messages(rm
, priority
);
784 case MSG_OSD_PG_PUSH
: {
785 const MOSDPGPush
*op
= static_cast<const MOSDPGPush
*>(_op
->get_req());
787 for (vector
<PushOp
>::const_iterator i
= op
->pushes
.begin();
788 i
!= op
->pushes
.end();
790 handle_recovery_push(*i
, &rm
);
792 dispatch_recovery_messages(rm
, priority
);
795 case MSG_OSD_PG_PUSH_REPLY
: {
796 const MOSDPGPushReply
*op
= static_cast<const MOSDPGPushReply
*>(
799 for (vector
<PushReplyOp
>::const_iterator i
= op
->replies
.begin();
800 i
!= op
->replies
.end();
802 handle_recovery_push_reply(*i
, op
->from
, &rm
);
804 dispatch_recovery_messages(rm
, priority
);
813 struct SubWriteCommitted
: public Context
{
818 eversion_t last_complete
;
819 const ZTracer::Trace trace
;
825 eversion_t last_complete
,
826 const ZTracer::Trace
&trace
)
827 : pg(pg
), msg(msg
), tid(tid
),
828 version(version
), last_complete(last_complete
), trace(trace
) {}
829 void finish(int) override
{
831 msg
->mark_event("sub_op_committed");
832 pg
->sub_write_committed(tid
, version
, last_complete
, trace
);
835 void ECBackend::sub_write_committed(
836 ceph_tid_t tid
, eversion_t version
, eversion_t last_complete
,
837 const ZTracer::Trace
&trace
) {
838 if (get_parent()->pgb_is_primary()) {
839 ECSubWriteReply reply
;
841 reply
.last_complete
= last_complete
;
842 reply
.committed
= true;
843 reply
.from
= get_parent()->whoami_shard();
844 handle_sub_write_reply(
845 get_parent()->whoami_shard(),
848 get_parent()->update_last_complete_ondisk(last_complete
);
849 MOSDECSubOpWriteReply
*r
= new MOSDECSubOpWriteReply
;
850 r
->pgid
= get_parent()->primary_spg_t();
851 r
->map_epoch
= get_parent()->get_epoch();
852 r
->min_epoch
= get_parent()->get_interval_start_epoch();
854 r
->op
.last_complete
= last_complete
;
855 r
->op
.committed
= true;
856 r
->op
.from
= get_parent()->whoami_shard();
857 r
->set_priority(CEPH_MSG_PRIO_HIGH
);
859 r
->trace
.event("sending sub op commit");
860 get_parent()->send_message_osd_cluster(
861 get_parent()->primary_shard().osd
, r
, get_parent()->get_epoch());
865 struct SubWriteApplied
: public Context
{
870 const ZTracer::Trace trace
;
876 const ZTracer::Trace
&trace
)
877 : pg(pg
), msg(msg
), tid(tid
), version(version
), trace(trace
) {}
878 void finish(int) override
{
880 msg
->mark_event("sub_op_applied");
881 pg
->sub_write_applied(tid
, version
, trace
);
884 void ECBackend::sub_write_applied(
885 ceph_tid_t tid
, eversion_t version
,
886 const ZTracer::Trace
&trace
) {
887 parent
->op_applied(version
);
888 if (get_parent()->pgb_is_primary()) {
889 ECSubWriteReply reply
;
890 reply
.from
= get_parent()->whoami_shard();
892 reply
.applied
= true;
893 handle_sub_write_reply(
894 get_parent()->whoami_shard(),
897 MOSDECSubOpWriteReply
*r
= new MOSDECSubOpWriteReply
;
898 r
->pgid
= get_parent()->primary_spg_t();
899 r
->map_epoch
= get_parent()->get_epoch();
900 r
->min_epoch
= get_parent()->get_interval_start_epoch();
901 r
->op
.from
= get_parent()->whoami_shard();
903 r
->op
.applied
= true;
904 r
->set_priority(CEPH_MSG_PRIO_HIGH
);
906 r
->trace
.event("sending sub op apply");
907 get_parent()->send_message_osd_cluster(
908 get_parent()->primary_shard().osd
, r
, get_parent()->get_epoch());
912 void ECBackend::handle_sub_write(
916 const ZTracer::Trace
&trace
,
917 Context
*on_local_applied_sync
)
921 trace
.event("handle_sub_write");
922 assert(!get_parent()->get_log().get_missing().is_missing(op
.soid
));
923 if (!get_parent()->pgb_is_primary())
924 get_parent()->update_stats(op
.stats
);
925 ObjectStore::Transaction localt
;
926 if (!op
.temp_added
.empty()) {
927 add_temp_objs(op
.temp_added
);
930 for (set
<hobject_t
>::iterator i
= op
.temp_removed
.begin();
931 i
!= op
.temp_removed
.end();
933 dout(10) << __func__
<< ": removing object " << *i
934 << " since we won't get the transaction" << dendl
;
940 get_parent()->whoami_shard().shard
));
943 clear_temp_objs(op
.temp_removed
);
944 get_parent()->log_operation(
946 op
.updated_hit_set_history
,
952 PrimaryLogPG
*_rPG
= dynamic_cast<PrimaryLogPG
*>(get_parent());
953 if (_rPG
&& !_rPG
->is_undersized() &&
954 (unsigned)get_parent()->whoami_shard().shard
>= ec_impl
->get_data_chunk_count())
955 op
.t
.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
957 if (on_local_applied_sync
) {
958 dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync
<< dendl
;
959 localt
.register_on_applied_sync(on_local_applied_sync
);
961 localt
.register_on_commit(
962 get_parent()->bless_context(
963 new SubWriteCommitted(
966 get_parent()->get_info().last_complete
, trace
)));
967 localt
.register_on_applied(
968 get_parent()->bless_context(
969 new SubWriteApplied(this, msg
, op
.tid
, op
.at_version
, trace
)));
970 vector
<ObjectStore::Transaction
> tls
;
972 tls
.push_back(std::move(op
.t
));
973 tls
.push_back(std::move(localt
));
974 get_parent()->queue_transactions(tls
, msg
);
977 void ECBackend::handle_sub_read(
980 ECSubReadReply
*reply
,
981 const ZTracer::Trace
&trace
)
983 trace
.event("handle sub read");
984 shard_id_t shard
= get_parent()->whoami_shard().shard
;
985 for(auto i
= op
.to_read
.begin();
986 i
!= op
.to_read
.end();
989 ECUtil::HashInfoRef hinfo
;
990 if (!get_parent()->get_pool().allows_ecoverwrites()) {
991 hinfo
= get_hash_info(i
->first
);
994 get_parent()->clog_error() << __func__
<< ": No hinfo for " << i
->first
;
995 dout(5) << __func__
<< ": No hinfo for " << i
->first
<< dendl
;
999 for (auto j
= i
->second
.begin(); j
!= i
->second
.end(); ++j
) {
1003 ghobject_t(i
->first
, ghobject_t::NO_GEN
, shard
),
1007 true); // Allow EIO return
1009 get_parent()->clog_error() << __func__
1013 dout(5) << __func__
<< ": Error " << r
1014 << " reading " << i
->first
<< dendl
;
1017 dout(20) << __func__
<< " read request=" << j
->get
<1>() << " r=" << r
<< " len=" << bl
.length() << dendl
;
1018 reply
->buffers_read
[i
->first
].push_back(
1025 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1026 // This shows that we still need deep scrub because large enough files
1027 // are read in sections, so the digest check here won't be done here.
1028 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1029 // the state of our chunk in case other chunks could substitute.
1030 assert(hinfo
->has_chunk_hash());
1031 if ((bl
.length() == hinfo
->get_total_chunk_size()) &&
1032 (j
->get
<0>() == 0)) {
1033 dout(20) << __func__
<< ": Checking hash of " << i
->first
<< dendl
;
1036 if (h
.digest() != hinfo
->get_chunk_hash(shard
)) {
1037 get_parent()->clog_error() << __func__
<< ": Bad hash for " << i
->first
<< " digest 0x"
1038 << hex
<< h
.digest() << " expected 0x" << hinfo
->get_chunk_hash(shard
) << dec
;
1039 dout(5) << __func__
<< ": Bad hash for " << i
->first
<< " digest 0x"
1040 << hex
<< h
.digest() << " expected 0x" << hinfo
->get_chunk_hash(shard
) << dec
<< dendl
;
1049 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1050 // the state of our chunk in case other chunks could substitute.
1051 reply
->buffers_read
.erase(i
->first
);
1052 reply
->errors
[i
->first
] = r
;
1054 for (set
<hobject_t
>::iterator i
= op
.attrs_to_read
.begin();
1055 i
!= op
.attrs_to_read
.end();
1057 dout(10) << __func__
<< ": fulfilling attr request on "
1059 if (reply
->errors
.count(*i
))
1061 int r
= store
->getattrs(
1064 *i
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
1065 reply
->attrs_read
[*i
]);
1067 reply
->buffers_read
.erase(*i
);
1068 reply
->errors
[*i
] = r
;
1071 reply
->from
= get_parent()->whoami_shard();
1072 reply
->tid
= op
.tid
;
1075 void ECBackend::handle_sub_write_reply(
1077 const ECSubWriteReply
&op
,
1078 const ZTracer::Trace
&trace
)
1080 map
<ceph_tid_t
, Op
>::iterator i
= tid_to_op_map
.find(op
.tid
);
1081 assert(i
!= tid_to_op_map
.end());
1083 trace
.event("sub write committed");
1084 assert(i
->second
.pending_commit
.count(from
));
1085 i
->second
.pending_commit
.erase(from
);
1086 if (from
!= get_parent()->whoami_shard()) {
1087 get_parent()->update_peer_last_complete_ondisk(from
, op
.last_complete
);
1091 trace
.event("sub write applied");
1092 assert(i
->second
.pending_apply
.count(from
));
1093 i
->second
.pending_apply
.erase(from
);
1096 if (i
->second
.pending_apply
.empty() && i
->second
.on_all_applied
) {
1097 dout(10) << __func__
<< " Calling on_all_applied on " << i
->second
<< dendl
;
1098 i
->second
.on_all_applied
->complete(0);
1099 i
->second
.on_all_applied
= 0;
1100 i
->second
.trace
.event("ec write all applied");
1102 if (i
->second
.pending_commit
.empty() && i
->second
.on_all_commit
) {
1103 dout(10) << __func__
<< " Calling on_all_commit on " << i
->second
<< dendl
;
1104 i
->second
.on_all_commit
->complete(0);
1105 i
->second
.on_all_commit
= 0;
1106 i
->second
.trace
.event("ec write all committed");
1111 void ECBackend::handle_sub_read_reply(
1114 RecoveryMessages
*m
,
1115 const ZTracer::Trace
&trace
)
1117 trace
.event("ec sub read reply");
1118 dout(10) << __func__
<< ": reply " << op
<< dendl
;
1119 map
<ceph_tid_t
, ReadOp
>::iterator iter
= tid_to_read_map
.find(op
.tid
);
1120 if (iter
== tid_to_read_map
.end()) {
1122 dout(20) << __func__
<< ": dropped " << op
<< dendl
;
1125 ReadOp
&rop
= iter
->second
;
1126 for (auto i
= op
.buffers_read
.begin();
1127 i
!= op
.buffers_read
.end();
1129 assert(!op
.errors
.count(i
->first
)); // If attribute error we better not have sent a buffer
1130 if (!rop
.to_read
.count(i
->first
)) {
1131 // We canceled this read! @see filter_read_op
1132 dout(20) << __func__
<< " to_read skipping" << dendl
;
1135 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter
=
1136 rop
.to_read
.find(i
->first
)->second
.to_read
.begin();
1139 uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > >::iterator riter
=
1140 rop
.complete
[i
->first
].returned
.begin();
1141 for (list
<pair
<uint64_t, bufferlist
> >::iterator j
= i
->second
.begin();
1142 j
!= i
->second
.end();
1143 ++j
, ++req_iter
, ++riter
) {
1144 assert(req_iter
!= rop
.to_read
.find(i
->first
)->second
.to_read
.end());
1145 assert(riter
!= rop
.complete
[i
->first
].returned
.end());
1146 pair
<uint64_t, uint64_t> adjusted
=
1147 sinfo
.aligned_offset_len_to_chunk(
1148 make_pair(req_iter
->get
<0>(), req_iter
->get
<1>()));
1149 assert(adjusted
.first
== j
->first
);
1150 riter
->get
<2>()[from
].claim(j
->second
);
1153 for (auto i
= op
.attrs_read
.begin();
1154 i
!= op
.attrs_read
.end();
1156 assert(!op
.errors
.count(i
->first
)); // if read error better not have sent an attribute
1157 if (!rop
.to_read
.count(i
->first
)) {
1158 // We canceled this read! @see filter_read_op
1159 dout(20) << __func__
<< " to_read skipping" << dendl
;
1162 rop
.complete
[i
->first
].attrs
= map
<string
, bufferlist
>();
1163 (*(rop
.complete
[i
->first
].attrs
)).swap(i
->second
);
1165 for (auto i
= op
.errors
.begin();
1166 i
!= op
.errors
.end();
1168 rop
.complete
[i
->first
].errors
.insert(
1172 dout(20) << __func__
<< " shard=" << from
<< " error=" << i
->second
<< dendl
;
1175 map
<pg_shard_t
, set
<ceph_tid_t
> >::iterator siter
=
1176 shard_to_read_map
.find(from
);
1177 assert(siter
!= shard_to_read_map
.end());
1178 assert(siter
->second
.count(op
.tid
));
1179 siter
->second
.erase(op
.tid
);
1181 assert(rop
.in_progress
.count(from
));
1182 rop
.in_progress
.erase(from
);
1183 unsigned is_complete
= 0;
1184 // For redundant reads check for completion as each shard comes in,
1185 // or in a non-recovery read check for completion once all the shards read.
1186 // TODO: It would be nice if recovery could send more reads too
1187 if (rop
.do_redundant_reads
|| (!rop
.for_recovery
&& rop
.in_progress
.empty())) {
1188 for (map
<hobject_t
, read_result_t
>::const_iterator iter
=
1189 rop
.complete
.begin();
1190 iter
!= rop
.complete
.end();
1193 for (map
<pg_shard_t
, bufferlist
>::const_iterator j
=
1194 iter
->second
.returned
.front().get
<2>().begin();
1195 j
!= iter
->second
.returned
.front().get
<2>().end();
1197 have
.insert(j
->first
.shard
);
1198 dout(20) << __func__
<< " have shard=" << j
->first
.shard
<< dendl
;
1200 set
<int> want_to_read
, dummy_minimum
;
1201 get_want_to_read_shards(&want_to_read
);
1203 if ((err
= ec_impl
->minimum_to_decode(want_to_read
, have
, &dummy_minimum
)) < 0) {
1204 dout(20) << __func__
<< " minimum_to_decode failed" << dendl
;
1205 if (rop
.in_progress
.empty()) {
1206 // If we don't have enough copies and we haven't sent reads for all shards
1207 // we can send the rest of the reads, if any.
1208 if (!rop
.do_redundant_reads
) {
1209 int r
= send_all_remaining_reads(iter
->first
, rop
);
1211 // We added to in_progress and not incrementing is_complete
1214 // Couldn't read any additional shards so handle as completed with errors
1216 if (rop
.complete
[iter
->first
].errors
.empty()) {
1217 dout(20) << __func__
<< " simply not enough copies err=" << err
<< dendl
;
1219 // Grab the first error
1220 err
= rop
.complete
[iter
->first
].errors
.begin()->second
;
1221 dout(20) << __func__
<< ": Use one of the shard errors err=" << err
<< dendl
;
1223 rop
.complete
[iter
->first
].r
= err
;
1227 assert(rop
.complete
[iter
->first
].r
== 0);
1228 if (!rop
.complete
[iter
->first
].errors
.empty()) {
1229 if (cct
->_conf
->osd_read_ec_check_for_errors
) {
1230 dout(10) << __func__
<< ": Not ignoring errors, use one shard err=" << err
<< dendl
;
1231 err
= rop
.complete
[iter
->first
].errors
.begin()->second
;
1232 rop
.complete
[iter
->first
].r
= err
;
1234 get_parent()->clog_error() << __func__
<< ": Error(s) ignored for "
1235 << iter
->first
<< " enough copies available";
1236 dout(10) << __func__
<< " Error(s) ignored for " << iter
->first
1237 << " enough copies available" << dendl
;
1238 rop
.complete
[iter
->first
].errors
.clear();
1245 if (rop
.in_progress
.empty() || is_complete
== rop
.complete
.size()) {
1246 dout(20) << __func__
<< " Complete: " << rop
<< dendl
;
1247 rop
.trace
.event("ec read complete");
1248 complete_read_op(rop
, m
);
1250 dout(10) << __func__
<< " readop not complete: " << rop
<< dendl
;
1254 void ECBackend::complete_read_op(ReadOp
&rop
, RecoveryMessages
*m
)
1256 map
<hobject_t
, read_request_t
>::iterator reqiter
=
1257 rop
.to_read
.begin();
1258 map
<hobject_t
, read_result_t
>::iterator resiter
=
1259 rop
.complete
.begin();
1260 assert(rop
.to_read
.size() == rop
.complete
.size());
1261 for (; reqiter
!= rop
.to_read
.end(); ++reqiter
, ++resiter
) {
1262 if (reqiter
->second
.cb
) {
1263 pair
<RecoveryMessages
*, read_result_t
&> arg(
1264 m
, resiter
->second
);
1265 reqiter
->second
.cb
->complete(arg
);
1266 reqiter
->second
.cb
= NULL
;
1269 tid_to_read_map
.erase(rop
.tid
);
1272 struct FinishReadOp
: public GenContext
<ThreadPool::TPHandle
&> {
1275 FinishReadOp(ECBackend
*ec
, ceph_tid_t tid
) : ec(ec
), tid(tid
) {}
1276 void finish(ThreadPool::TPHandle
&handle
) override
{
1277 auto ropiter
= ec
->tid_to_read_map
.find(tid
);
1278 assert(ropiter
!= ec
->tid_to_read_map
.end());
1279 int priority
= ropiter
->second
.priority
;
1280 RecoveryMessages rm
;
1281 ec
->complete_read_op(ropiter
->second
, &rm
);
1282 ec
->dispatch_recovery_messages(rm
, priority
);
1286 void ECBackend::filter_read_op(
1287 const OSDMapRef
& osdmap
,
1290 set
<hobject_t
> to_cancel
;
1291 for (map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= op
.source_to_obj
.begin();
1292 i
!= op
.source_to_obj
.end();
1294 if (osdmap
->is_down(i
->first
.osd
)) {
1295 to_cancel
.insert(i
->second
.begin(), i
->second
.end());
1296 op
.in_progress
.erase(i
->first
);
1301 if (to_cancel
.empty())
1304 for (map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= op
.source_to_obj
.begin();
1305 i
!= op
.source_to_obj
.end();
1307 for (set
<hobject_t
>::iterator j
= i
->second
.begin();
1308 j
!= i
->second
.end();
1310 if (to_cancel
.count(*j
))
1311 i
->second
.erase(j
++);
1315 if (i
->second
.empty()) {
1316 op
.source_to_obj
.erase(i
++);
1318 assert(!osdmap
->is_down(i
->first
.osd
));
1323 for (set
<hobject_t
>::iterator i
= to_cancel
.begin();
1324 i
!= to_cancel
.end();
1326 get_parent()->cancel_pull(*i
);
1328 assert(op
.to_read
.count(*i
));
1329 read_request_t
&req
= op
.to_read
.find(*i
)->second
;
1330 dout(10) << __func__
<< ": canceling " << req
1331 << " for obj " << *i
<< dendl
;
1336 op
.to_read
.erase(*i
);
1337 op
.complete
.erase(*i
);
1338 recovery_ops
.erase(*i
);
1341 if (op
.in_progress
.empty()) {
1342 get_parent()->schedule_recovery_work(
1343 get_parent()->bless_gencontext(
1344 new FinishReadOp(this, op
.tid
)));
1348 void ECBackend::check_recovery_sources(const OSDMapRef
& osdmap
)
1350 set
<ceph_tid_t
> tids_to_filter
;
1351 for (map
<pg_shard_t
, set
<ceph_tid_t
> >::iterator
1352 i
= shard_to_read_map
.begin();
1353 i
!= shard_to_read_map
.end();
1355 if (osdmap
->is_down(i
->first
.osd
)) {
1356 tids_to_filter
.insert(i
->second
.begin(), i
->second
.end());
1357 shard_to_read_map
.erase(i
++);
1362 for (set
<ceph_tid_t
>::iterator i
= tids_to_filter
.begin();
1363 i
!= tids_to_filter
.end();
1365 map
<ceph_tid_t
, ReadOp
>::iterator j
= tid_to_read_map
.find(*i
);
1366 assert(j
!= tid_to_read_map
.end());
1367 filter_read_op(osdmap
, j
->second
);
1371 void ECBackend::on_change()
1373 dout(10) << __func__
<< dendl
;
1375 completed_to
= eversion_t();
1376 committed_to
= eversion_t();
1377 pipeline_state
.clear();
1378 waiting_reads
.clear();
1379 waiting_state
.clear();
1380 waiting_commit
.clear();
1381 for (auto &&op
: tid_to_op_map
) {
1382 cache
.release_write_pin(op
.second
.pin
);
1384 tid_to_op_map
.clear();
1386 for (map
<ceph_tid_t
, ReadOp
>::iterator i
= tid_to_read_map
.begin();
1387 i
!= tid_to_read_map
.end();
1389 dout(10) << __func__
<< ": cancelling " << i
->second
<< dendl
;
1390 for (map
<hobject_t
, read_request_t
>::iterator j
=
1391 i
->second
.to_read
.begin();
1392 j
!= i
->second
.to_read
.end();
1394 delete j
->second
.cb
;
1398 tid_to_read_map
.clear();
1399 in_progress_client_reads
.clear();
1400 shard_to_read_map
.clear();
1401 clear_recovery_state();
1404 void ECBackend::clear_recovery_state()
1406 recovery_ops
.clear();
1409 void ECBackend::on_flushed()
1413 void ECBackend::dump_recovery_info(Formatter
*f
) const
1415 f
->open_array_section("recovery_ops");
1416 for (map
<hobject_t
, RecoveryOp
>::const_iterator i
= recovery_ops
.begin();
1417 i
!= recovery_ops
.end();
1419 f
->open_object_section("op");
1424 f
->open_array_section("read_ops");
1425 for (map
<ceph_tid_t
, ReadOp
>::const_iterator i
= tid_to_read_map
.begin();
1426 i
!= tid_to_read_map
.end();
1428 f
->open_object_section("read_op");
1435 void ECBackend::submit_transaction(
1436 const hobject_t
&hoid
,
1437 const object_stat_sum_t
&delta_stats
,
1438 const eversion_t
&at_version
,
1439 PGTransactionUPtr
&&t
,
1440 const eversion_t
&trim_to
,
1441 const eversion_t
&roll_forward_to
,
1442 const vector
<pg_log_entry_t
> &log_entries
,
1443 boost::optional
<pg_hit_set_history_t
> &hset_history
,
1444 Context
*on_local_applied_sync
,
1445 Context
*on_all_applied
,
1446 Context
*on_all_commit
,
1449 OpRequestRef client_op
1452 assert(!tid_to_op_map
.count(tid
));
1453 Op
*op
= &(tid_to_op_map
[tid
]);
1455 op
->delta_stats
= delta_stats
;
1456 op
->version
= at_version
;
1457 op
->trim_to
= trim_to
;
1458 op
->roll_forward_to
= MAX(roll_forward_to
, committed_to
);
1459 op
->log_entries
= log_entries
;
1460 std::swap(op
->updated_hit_set_history
, hset_history
);
1461 op
->on_local_applied_sync
= on_local_applied_sync
;
1462 op
->on_all_applied
= on_all_applied
;
1463 op
->on_all_commit
= on_all_commit
;
1466 op
->client_op
= client_op
;
1468 op
->trace
= client_op
->pg_trace
;
1470 dout(10) << __func__
<< ": op " << *op
<< " starting" << dendl
;
1471 start_rmw(op
, std::move(t
));
1472 dout(10) << "onreadable_sync: " << op
->on_local_applied_sync
<< dendl
;
1475 void ECBackend::call_write_ordered(std::function
<void(void)> &&cb
) {
1476 if (!waiting_state
.empty()) {
1477 waiting_state
.back().on_write
.emplace_back(std::move(cb
));
1478 } else if (!waiting_reads
.empty()) {
1479 waiting_reads
.back().on_write
.emplace_back(std::move(cb
));
1481 // Nothing earlier in the pipeline, just call it
1486 int ECBackend::get_min_avail_to_read_shards(
1487 const hobject_t
&hoid
,
1488 const set
<int> &want
,
1490 bool do_redundant_reads
,
1491 set
<pg_shard_t
> *to_read
)
1493 // Make sure we don't do redundant reads for recovery
1494 assert(!for_recovery
|| !do_redundant_reads
);
1497 map
<shard_id_t
, pg_shard_t
> shards
;
1499 for (set
<pg_shard_t
>::const_iterator i
=
1500 get_parent()->get_acting_shards().begin();
1501 i
!= get_parent()->get_acting_shards().end();
1503 dout(10) << __func__
<< ": checking acting " << *i
<< dendl
;
1504 const pg_missing_t
&missing
= get_parent()->get_shard_missing(*i
);
1505 if (!missing
.is_missing(hoid
)) {
1506 assert(!have
.count(i
->shard
));
1507 have
.insert(i
->shard
);
1508 assert(!shards
.count(i
->shard
));
1509 shards
.insert(make_pair(i
->shard
, *i
));
1514 for (set
<pg_shard_t
>::const_iterator i
=
1515 get_parent()->get_backfill_shards().begin();
1516 i
!= get_parent()->get_backfill_shards().end();
1518 if (have
.count(i
->shard
)) {
1519 assert(shards
.count(i
->shard
));
1522 dout(10) << __func__
<< ": checking backfill " << *i
<< dendl
;
1523 assert(!shards
.count(i
->shard
));
1524 const pg_info_t
&info
= get_parent()->get_shard_info(*i
);
1525 const pg_missing_t
&missing
= get_parent()->get_shard_missing(*i
);
1526 if (hoid
< info
.last_backfill
&&
1527 !missing
.is_missing(hoid
)) {
1528 have
.insert(i
->shard
);
1529 shards
.insert(make_pair(i
->shard
, *i
));
1533 map
<hobject_t
, set
<pg_shard_t
>>::const_iterator miter
=
1534 get_parent()->get_missing_loc_shards().find(hoid
);
1535 if (miter
!= get_parent()->get_missing_loc_shards().end()) {
1536 for (set
<pg_shard_t
>::iterator i
= miter
->second
.begin();
1537 i
!= miter
->second
.end();
1539 dout(10) << __func__
<< ": checking missing_loc " << *i
<< dendl
;
1540 auto m
= get_parent()->maybe_get_shard_missing(*i
);
1542 assert(!(*m
).is_missing(hoid
));
1544 have
.insert(i
->shard
);
1545 shards
.insert(make_pair(i
->shard
, *i
));
1551 int r
= ec_impl
->minimum_to_decode(want
, have
, &need
);
1555 if (do_redundant_reads
) {
1562 for (set
<int>::iterator i
= need
.begin();
1565 assert(shards
.count(shard_id_t(*i
)));
1566 to_read
->insert(shards
[shard_id_t(*i
)]);
1571 int ECBackend::get_remaining_shards(
1572 const hobject_t
&hoid
,
1573 const set
<int> &avail
,
1574 set
<pg_shard_t
> *to_read
)
1577 map
<shard_id_t
, pg_shard_t
> shards
;
1579 for (set
<pg_shard_t
>::const_iterator i
=
1580 get_parent()->get_acting_shards().begin();
1581 i
!= get_parent()->get_acting_shards().end();
1583 dout(10) << __func__
<< ": checking acting " << *i
<< dendl
;
1584 const pg_missing_t
&missing
= get_parent()->get_shard_missing(*i
);
1585 if (!missing
.is_missing(hoid
)) {
1586 assert(!need
.count(i
->shard
));
1587 need
.insert(i
->shard
);
1588 assert(!shards
.count(i
->shard
));
1589 shards
.insert(make_pair(i
->shard
, *i
));
1596 for (set
<int>::iterator i
= need
.begin();
1599 assert(shards
.count(shard_id_t(*i
)));
1600 if (avail
.find(*i
) == avail
.end())
1601 to_read
->insert(shards
[shard_id_t(*i
)]);
1606 void ECBackend::start_read_op(
1608 map
<hobject_t
, read_request_t
> &to_read
,
1610 bool do_redundant_reads
,
1613 ceph_tid_t tid
= get_parent()->get_tid();
1614 assert(!tid_to_read_map
.count(tid
));
1615 auto &op
= tid_to_read_map
.emplace(
1623 std::move(to_read
))).first
->second
;
1624 dout(10) << __func__
<< ": starting " << op
<< dendl
;
1626 op
.trace
= _op
->pg_trace
;
1627 op
.trace
.event("start ec read");
1632 void ECBackend::do_read_op(ReadOp
&op
)
1634 int priority
= op
.priority
;
1635 ceph_tid_t tid
= op
.tid
;
1637 dout(10) << __func__
<< ": starting read " << op
<< dendl
;
1639 map
<pg_shard_t
, ECSubRead
> messages
;
1640 for (map
<hobject_t
, read_request_t
>::iterator i
= op
.to_read
.begin();
1641 i
!= op
.to_read
.end();
1643 bool need_attrs
= i
->second
.want_attrs
;
1644 for (set
<pg_shard_t
>::const_iterator j
= i
->second
.need
.begin();
1645 j
!= i
->second
.need
.end();
1648 messages
[*j
].attrs_to_read
.insert(i
->first
);
1651 op
.obj_to_source
[i
->first
].insert(*j
);
1652 op
.source_to_obj
[*j
].insert(i
->first
);
1654 for (list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >::const_iterator j
=
1655 i
->second
.to_read
.begin();
1656 j
!= i
->second
.to_read
.end();
1658 pair
<uint64_t, uint64_t> chunk_off_len
=
1659 sinfo
.aligned_offset_len_to_chunk(make_pair(j
->get
<0>(), j
->get
<1>()));
1660 for (set
<pg_shard_t
>::const_iterator k
= i
->second
.need
.begin();
1661 k
!= i
->second
.need
.end();
1663 messages
[*k
].to_read
[i
->first
].push_back(
1665 chunk_off_len
.first
,
1666 chunk_off_len
.second
,
1669 assert(!need_attrs
);
1673 for (map
<pg_shard_t
, ECSubRead
>::iterator i
= messages
.begin();
1674 i
!= messages
.end();
1676 op
.in_progress
.insert(i
->first
);
1677 shard_to_read_map
[i
->first
].insert(op
.tid
);
1678 i
->second
.tid
= tid
;
1679 MOSDECSubOpRead
*msg
= new MOSDECSubOpRead
;
1680 msg
->set_priority(priority
);
1682 get_parent()->whoami_spg_t().pgid
,
1684 msg
->map_epoch
= get_parent()->get_epoch();
1685 msg
->min_epoch
= get_parent()->get_interval_start_epoch();
1686 msg
->op
= i
->second
;
1687 msg
->op
.from
= get_parent()->whoami_shard();
1690 // initialize a child span for this shard
1691 msg
->trace
.init("ec sub read", nullptr, &op
.trace
);
1692 msg
->trace
.keyval("shard", i
->first
.shard
.id
);
1694 get_parent()->send_message_osd_cluster(
1697 get_parent()->get_epoch());
1699 dout(10) << __func__
<< ": started " << op
<< dendl
;
1702 ECUtil::HashInfoRef
ECBackend::get_hash_info(
1703 const hobject_t
&hoid
, bool checks
, const map
<string
,bufferptr
> *attrs
)
1705 dout(10) << __func__
<< ": Getting attr on " << hoid
<< dendl
;
1706 ECUtil::HashInfoRef ref
= unstable_hashinfo_registry
.lookup(hoid
);
1708 dout(10) << __func__
<< ": not in cache " << hoid
<< dendl
;
1710 int r
= store
->stat(
1712 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
1714 ECUtil::HashInfo
hinfo(ec_impl
->get_chunk_count());
1715 // XXX: What does it mean if there is no object on disk?
1717 dout(10) << __func__
<< ": found on disk, size " << st
.st_size
<< dendl
;
1720 map
<string
, bufferptr
>::const_iterator k
= attrs
->find(ECUtil::get_hinfo_key());
1721 if (k
== attrs
->end()) {
1722 dout(5) << __func__
<< " " << hoid
<< " missing hinfo attr" << dendl
;
1724 bl
.push_back(k
->second
);
1729 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
1730 ECUtil::get_hinfo_key(),
1733 dout(5) << __func__
<< ": getattr failed: " << cpp_strerror(r
) << dendl
;
1734 bl
.clear(); // just in case
1737 if (bl
.length() > 0) {
1738 bufferlist::iterator bp
= bl
.begin();
1739 ::decode(hinfo
, bp
);
1740 if (checks
&& hinfo
.get_total_chunk_size() != (uint64_t)st
.st_size
) {
1741 dout(0) << __func__
<< ": Mismatch of total_chunk_size "
1742 << hinfo
.get_total_chunk_size() << dendl
;
1743 return ECUtil::HashInfoRef();
1745 } else if (st
.st_size
> 0) { // If empty object and no hinfo, create it
1746 return ECUtil::HashInfoRef();
1749 ref
= unstable_hashinfo_registry
.lookup_or_create(hoid
, hinfo
);
1754 void ECBackend::start_rmw(Op
*op
, PGTransactionUPtr
&&t
)
1758 op
->plan
= ECTransaction::get_write_plan(
1761 [&](const hobject_t
&i
) {
1762 ECUtil::HashInfoRef ref
= get_hash_info(i
, false);
1764 derr
<< __func__
<< ": get_hash_info(" << i
<< ")"
1765 << " returned a null pointer and there is no "
1766 << " way to recover from such an error in this "
1767 << " context" << dendl
;
1772 get_parent()->get_dpp());
1774 dout(10) << __func__
<< ": " << *op
<< dendl
;
1776 waiting_state
.push_back(*op
);
1780 bool ECBackend::try_state_to_reads()
1782 if (waiting_state
.empty())
1785 Op
*op
= &(waiting_state
.front());
1786 if (op
->requires_rmw() && pipeline_state
.cache_invalid()) {
1787 assert(get_parent()->get_pool().allows_ecoverwrites());
1788 dout(20) << __func__
<< ": blocking " << *op
1789 << " because it requires an rmw and the cache is invalid "
1795 if (op
->invalidates_cache()) {
1796 dout(20) << __func__
<< ": invalidating cache after this op"
1798 pipeline_state
.invalidate();
1799 op
->using_cache
= false;
1801 op
->using_cache
= pipeline_state
.caching_enabled();
1804 waiting_state
.pop_front();
1805 waiting_reads
.push_back(*op
);
1807 if (op
->using_cache
) {
1808 cache
.open_write_pin(op
->pin
);
1811 for (auto &&hpair
: op
->plan
.will_write
) {
1812 auto to_read_plan_iter
= op
->plan
.to_read
.find(hpair
.first
);
1813 const extent_set
&to_read_plan
=
1814 to_read_plan_iter
== op
->plan
.to_read
.end() ?
1816 to_read_plan_iter
->second
;
1818 extent_set remote_read
= cache
.reserve_extents_for_rmw(
1824 extent_set pending_read
= to_read_plan
;
1825 pending_read
.subtract(remote_read
);
1827 if (!remote_read
.empty()) {
1828 op
->remote_read
[hpair
.first
] = std::move(remote_read
);
1830 if (!pending_read
.empty()) {
1831 op
->pending_read
[hpair
.first
] = std::move(pending_read
);
1835 op
->remote_read
= op
->plan
.to_read
;
1838 dout(10) << __func__
<< ": " << *op
<< dendl
;
1840 if (!op
->remote_read
.empty()) {
1841 assert(get_parent()->get_pool().allows_ecoverwrites());
1842 objects_read_async_no_cache(
1844 [this, op
](map
<hobject_t
,pair
<int, extent_map
> > &&results
) {
1845 for (auto &&i
: results
) {
1846 op
->remote_read_result
.emplace(i
.first
, i
.second
.second
);
1855 bool ECBackend::try_reads_to_commit()
1857 if (waiting_reads
.empty())
1859 Op
*op
= &(waiting_reads
.front());
1860 if (op
->read_in_progress())
1862 waiting_reads
.pop_front();
1863 waiting_commit
.push_back(*op
);
1865 dout(10) << __func__
<< ": starting commit on " << *op
<< dendl
;
1866 dout(20) << __func__
<< ": " << cache
<< dendl
;
1868 get_parent()->apply_stats(
1872 if (op
->using_cache
) {
1873 for (auto &&hpair
: op
->pending_read
) {
1874 op
->remote_read_result
[hpair
.first
].insert(
1875 cache
.get_remaining_extents_for_rmw(
1880 op
->pending_read
.clear();
1882 assert(op
->pending_read
.empty());
1885 map
<shard_id_t
, ObjectStore::Transaction
> trans
;
1886 for (set
<pg_shard_t
>::const_iterator i
=
1887 get_parent()->get_actingbackfill_shards().begin();
1888 i
!= get_parent()->get_actingbackfill_shards().end();
1893 op
->trace
.event("start ec write");
1895 map
<hobject_t
,extent_map
> written
;
1897 ECTransaction::generate_transactions(
1900 get_parent()->get_info().pgid
.pgid
,
1901 (get_osdmap()->require_osd_release
< CEPH_RELEASE_KRAKEN
),
1903 op
->remote_read_result
,
1908 &(op
->temp_cleared
),
1909 get_parent()->get_dpp());
1912 dout(20) << __func__
<< ": " << cache
<< dendl
;
1913 dout(20) << __func__
<< ": written: " << written
<< dendl
;
1914 dout(20) << __func__
<< ": op: " << *op
<< dendl
;
1916 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1917 for (auto &&i
: op
->log_entries
) {
1918 if (i
.requires_kraken()) {
1919 derr
<< __func__
<< ": log entry " << i
<< " requires kraken"
1920 << " but overwrites are not enabled!" << dendl
;
1926 map
<hobject_t
,extent_set
> written_set
;
1927 for (auto &&i
: written
) {
1928 written_set
[i
.first
] = i
.second
.get_interval_set();
1930 dout(20) << __func__
<< ": written_set: " << written_set
<< dendl
;
1931 assert(written_set
== op
->plan
.will_write
);
1933 if (op
->using_cache
) {
1934 for (auto &&hpair
: written
) {
1935 dout(20) << __func__
<< ": " << hpair
<< dendl
;
1936 cache
.present_rmw_update(hpair
.first
, op
->pin
, hpair
.second
);
1939 op
->remote_read
.clear();
1940 op
->remote_read_result
.clear();
1942 dout(10) << "onreadable_sync: " << op
->on_local_applied_sync
<< dendl
;
1943 ObjectStore::Transaction empty
;
1944 bool should_write_local
= false;
1945 ECSubWrite local_write_op
;
1946 for (set
<pg_shard_t
>::const_iterator i
=
1947 get_parent()->get_actingbackfill_shards().begin();
1948 i
!= get_parent()->get_actingbackfill_shards().end();
1950 op
->pending_apply
.insert(*i
);
1951 op
->pending_commit
.insert(*i
);
1952 map
<shard_id_t
, ObjectStore::Transaction
>::iterator iter
=
1953 trans
.find(i
->shard
);
1954 assert(iter
!= trans
.end());
1955 bool should_send
= get_parent()->should_send_op(*i
, op
->hoid
);
1956 const pg_stat_t
&stats
=
1959 parent
->get_shard_info().find(*i
)->second
.stats
;
1962 get_parent()->whoami_shard(),
1967 should_send
? iter
->second
: empty
,
1970 op
->roll_forward_to
,
1972 op
->updated_hit_set_history
,
1977 ZTracer::Trace trace
;
1979 // initialize a child span for this shard
1980 trace
.init("ec sub write", nullptr, &op
->trace
);
1981 trace
.keyval("shard", i
->shard
.id
);
1984 if (*i
== get_parent()->whoami_shard()) {
1985 should_write_local
= true;
1986 local_write_op
.claim(sop
);
1988 MOSDECSubOpWrite
*r
= new MOSDECSubOpWrite(sop
);
1989 r
->pgid
= spg_t(get_parent()->primary_spg_t().pgid
, i
->shard
);
1990 r
->map_epoch
= get_parent()->get_epoch();
1991 r
->min_epoch
= get_parent()->get_interval_start_epoch();
1993 get_parent()->send_message_osd_cluster(
1994 i
->osd
, r
, get_parent()->get_epoch());
1997 if (should_write_local
) {
1999 get_parent()->whoami_shard(),
2003 op
->on_local_applied_sync
);
2004 op
->on_local_applied_sync
= 0;
2007 for (auto i
= op
->on_write
.begin();
2008 i
!= op
->on_write
.end();
2009 op
->on_write
.erase(i
++)) {
2016 bool ECBackend::try_finish_rmw()
2018 if (waiting_commit
.empty())
2020 Op
*op
= &(waiting_commit
.front());
2021 if (op
->write_in_progress())
2023 waiting_commit
.pop_front();
2025 dout(10) << __func__
<< ": " << *op
<< dendl
;
2026 dout(20) << __func__
<< ": " << cache
<< dendl
;
2028 if (op
->roll_forward_to
> completed_to
)
2029 completed_to
= op
->roll_forward_to
;
2030 if (op
->version
> committed_to
)
2031 committed_to
= op
->version
;
2033 if (get_osdmap()->require_osd_release
>= CEPH_RELEASE_KRAKEN
) {
2034 if (op
->version
> get_parent()->get_log().get_can_rollback_to() &&
2035 waiting_reads
.empty() &&
2036 waiting_commit
.empty()) {
2037 // submit a dummy transaction to kick the rollforward
2038 auto tid
= get_parent()->get_tid();
2039 Op
*nop
= &(tid_to_op_map
[tid
]);
2040 nop
->hoid
= op
->hoid
;
2041 nop
->trim_to
= op
->trim_to
;
2042 nop
->roll_forward_to
= op
->version
;
2044 nop
->reqid
= op
->reqid
;
2045 waiting_reads
.push_back(*nop
);
2049 if (op
->using_cache
) {
2050 cache
.release_write_pin(op
->pin
);
2052 tid_to_op_map
.erase(op
->tid
);
2054 if (waiting_reads
.empty() &&
2055 waiting_commit
.empty()) {
2056 pipeline_state
.clear();
2057 dout(20) << __func__
<< ": clearing pipeline_state "
2064 void ECBackend::check_ops()
2066 while (try_state_to_reads() ||
2067 try_reads_to_commit() ||
2071 int ECBackend::objects_read_sync(
2072 const hobject_t
&hoid
,
2081 void ECBackend::objects_read_async(
2082 const hobject_t
&hoid
,
2083 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2084 pair
<bufferlist
*, Context
*> > > &to_read
,
2085 Context
*on_complete
,
2088 map
<hobject_t
,std::list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > >
2093 for (list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2094 pair
<bufferlist
*, Context
*> > >::const_iterator i
=
2098 pair
<uint64_t, uint64_t> tmp
=
2099 sinfo
.offset_len_to_stripe_bounds(
2100 make_pair(i
->first
.get
<0>(), i
->first
.get
<1>()));
2103 esnew
.insert(tmp
.first
, tmp
.second
);
2105 flags
|= i
->first
.get
<2>();
2109 auto &offsets
= reads
[hoid
];
2110 for (auto j
= es
.begin();
2124 list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2125 pair
<bufferlist
*, Context
*> > > to_read
;
2126 unique_ptr
<Context
> on_complete
;
2127 cb(const cb
&) = delete;
2128 cb(cb
&&) = default;
2130 const hobject_t
&hoid
,
2131 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2132 pair
<bufferlist
*, Context
*> > > &to_read
,
2133 Context
*on_complete
)
2137 on_complete(on_complete
) {}
2138 void operator()(map
<hobject_t
,pair
<int, extent_map
> > &&results
) {
2139 auto dpp
= ec
->get_parent()->get_dpp();
2140 ldpp_dout(dpp
, 20) << "objects_read_async_cb: got: " << results
2142 ldpp_dout(dpp
, 20) << "objects_read_async_cb: cache: " << ec
->cache
2145 auto &got
= results
[hoid
];
2148 for (auto &&read
: to_read
) {
2149 if (got
.first
< 0) {
2150 if (read
.second
.second
) {
2151 read
.second
.second
->complete(got
.first
);
2156 assert(read
.second
.first
);
2157 uint64_t offset
= read
.first
.get
<0>();
2158 uint64_t length
= read
.first
.get
<1>();
2159 auto range
= got
.second
.get_containing_range(offset
, length
);
2160 assert(range
.first
!= range
.second
);
2161 assert(range
.first
.get_off() <= offset
);
2163 (offset
+ length
) <=
2164 (range
.first
.get_off() + range
.first
.get_len()));
2165 read
.second
.first
->substr_of(
2166 range
.first
.get_val(),
2167 offset
- range
.first
.get_off(),
2169 if (read
.second
.second
) {
2170 read
.second
.second
->complete(length
);
2171 read
.second
.second
= nullptr;
2177 on_complete
.release()->complete(r
);
2181 for (auto &&i
: to_read
) {
2182 delete i
.second
.second
;
2187 objects_read_and_reconstruct(
2190 make_gen_lambda_context
<
2191 map
<hobject_t
,pair
<int, extent_map
> > &&, cb
>(
2198 struct CallClientContexts
:
2199 public GenContext
<pair
<RecoveryMessages
*, ECBackend::read_result_t
& > &> {
2202 ECBackend::ClientAsyncReadStatus
*status
;
2203 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > to_read
;
2207 ECBackend::ClientAsyncReadStatus
*status
,
2208 const list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > &to_read
)
2209 : hoid(hoid
), ec(ec
), status(status
), to_read(to_read
) {}
2210 void finish(pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
) override
{
2211 ECBackend::read_result_t
&res
= in
.second
;
2215 assert(res
.returned
.size() == to_read
.size());
2217 assert(res
.errors
.empty());
2218 for (auto &&read
: to_read
) {
2219 pair
<uint64_t, uint64_t> adjusted
=
2220 ec
->sinfo
.offset_len_to_stripe_bounds(
2221 make_pair(read
.get
<0>(), read
.get
<1>()));
2222 assert(res
.returned
.front().get
<0>() == adjusted
.first
&&
2223 res
.returned
.front().get
<1>() == adjusted
.second
);
2224 map
<int, bufferlist
> to_decode
;
2226 for (map
<pg_shard_t
, bufferlist
>::iterator j
=
2227 res
.returned
.front().get
<2>().begin();
2228 j
!= res
.returned
.front().get
<2>().end();
2230 to_decode
[j
->first
.shard
].claim(j
->second
);
2232 int r
= ECUtil::decode(
2244 read
.get
<0>() - adjusted
.first
,
2246 bl
.length() - (read
.get
<0>() - adjusted
.first
)));
2248 read
.get
<0>(), trimmed
.length(), std::move(trimmed
));
2249 res
.returned
.pop_front();
2252 status
->complete_object(hoid
, res
.r
, std::move(result
));
2257 void ECBackend::objects_read_and_reconstruct(
2258 const map
<hobject_t
,
2259 std::list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >
2262 GenContextURef
<map
<hobject_t
,pair
<int, extent_map
> > &&> &&func
)
2264 in_progress_client_reads
.emplace_back(
2265 reads
.size(), std::move(func
));
2266 if (!reads
.size()) {
2271 set
<int> want_to_read
;
2272 get_want_to_read_shards(&want_to_read
);
2274 map
<hobject_t
, read_request_t
> for_read_op
;
2275 for (auto &&to_read
: reads
) {
2276 set
<pg_shard_t
> shards
;
2277 int r
= get_min_avail_to_read_shards(
2285 CallClientContexts
*c
= new CallClientContexts(
2288 &(in_progress_client_reads
.back()),
2301 CEPH_MSG_PRIO_DEFAULT
,
2309 int ECBackend::send_all_remaining_reads(
2310 const hobject_t
&hoid
,
2313 set
<int> already_read
;
2314 const set
<pg_shard_t
>& ots
= rop
.obj_to_source
[hoid
];
2315 for (set
<pg_shard_t
>::iterator i
= ots
.begin(); i
!= ots
.end(); ++i
)
2316 already_read
.insert(i
->shard
);
2317 dout(10) << __func__
<< " have/error shards=" << already_read
<< dendl
;
2318 set
<pg_shard_t
> shards
;
2319 int r
= get_remaining_shards(hoid
, already_read
, &shards
);
2325 dout(10) << __func__
<< " Read remaining shards " << shards
<< dendl
;
2327 // TODOSAM: this doesn't seem right
2328 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > offsets
=
2329 rop
.to_read
.find(hoid
)->second
.to_read
;
2330 GenContext
<pair
<RecoveryMessages
*, read_result_t
& > &> *c
=
2331 rop
.to_read
.find(hoid
)->second
.cb
;
2333 map
<hobject_t
, read_request_t
> for_read_op
;
2343 rop
.to_read
.swap(for_read_op
);
2348 int ECBackend::objects_get_attrs(
2349 const hobject_t
&hoid
,
2350 map
<string
, bufferlist
> *out
)
2352 int r
= store
->getattrs(
2354 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
2359 for (map
<string
, bufferlist
>::iterator i
= out
->begin();
2362 if (ECUtil::is_hinfo_key_string(i
->first
))
2370 void ECBackend::rollback_append(
2371 const hobject_t
&hoid
,
2373 ObjectStore::Transaction
*t
)
2375 assert(old_size
% sinfo
.get_stripe_width() == 0);
2378 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
2379 sinfo
.aligned_logical_offset_to_chunk_offset(
2383 void ECBackend::be_deep_scrub(
2384 const hobject_t
&poid
,
2386 ScrubMap::object
&o
,
2387 ThreadPool::TPHandle
&handle
) {
2388 bufferhash
h(-1); // we always used -1
2390 uint64_t stride
= cct
->_conf
->osd_deep_scrub_stride
;
2391 if (stride
% sinfo
.get_chunk_size())
2392 stride
+= sinfo
.get_chunk_size() - (stride
% sinfo
.get_chunk_size());
2395 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
| CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
2399 handle
.reset_tp_timeout();
2403 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
2406 fadvise_flags
, true);
2409 if (bl
.length() % sinfo
.get_chunk_size()) {
2415 if ((unsigned)r
< stride
)
2420 dout(0) << "_scan_list " << poid
<< " got "
2421 << r
<< " on read, read_error" << dendl
;
2422 o
.read_error
= true;
2426 ECUtil::HashInfoRef hinfo
= get_hash_info(poid
, false, &o
.attrs
);
2428 dout(0) << "_scan_list " << poid
<< " could not retrieve hash info" << dendl
;
2429 o
.read_error
= true;
2430 o
.digest_present
= false;
2433 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2434 assert(hinfo
->has_chunk_hash());
2435 if (hinfo
->get_total_chunk_size() != pos
) {
2436 dout(0) << "_scan_list " << poid
<< " got incorrect size on read" << dendl
;
2437 o
.ec_size_mismatch
= true;
2441 if (hinfo
->get_chunk_hash(get_parent()->whoami_shard().shard
) != h
.digest()) {
2442 dout(0) << "_scan_list " << poid
<< " got incorrect hash on read" << dendl
;
2443 o
.ec_hash_mismatch
= true;
2447 /* We checked above that we match our own stored hash. We cannot
2448 * send a hash of the actual object, so instead we simply send
2449 * our locally stored hash of shard 0 on the assumption that if
2450 * we match our chunk hash and our recollection of the hash for
2451 * chunk 0 matches that of our peers, there is likely no corruption.
2453 o
.digest
= hinfo
->get_chunk_hash(0);
2454 o
.digest_present
= true;
2456 /* Hack! We must be using partial overwrites, and partial overwrites
2457 * don't support deep-scrub yet
2460 o
.digest_present
= true;
2464 o
.omap_digest
= seed
;
2465 o
.omap_digest_present
= true;