1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include "ECBackend.h"
19 #include "messages/MOSDPGPush.h"
20 #include "messages/MOSDPGPushReply.h"
21 #include "messages/MOSDECSubOpWrite.h"
22 #include "messages/MOSDECSubOpWriteReply.h"
23 #include "messages/MOSDECSubOpRead.h"
24 #include "messages/MOSDECSubOpReadReply.h"
25 #include "ECMsgTypes.h"
27 #include "PrimaryLogPG.h"
29 #define dout_context cct
30 #define dout_subsys ceph_subsys_osd
31 #define DOUT_PREFIX_ARGS this
33 #define dout_prefix _prefix(_dout, this)
34 static ostream
& _prefix(std::ostream
*_dout
, ECBackend
*pgb
) {
35 return pgb
->get_parent()->gen_dbg_prefix(*_dout
);
38 struct ECRecoveryHandle
: public PGBackend::RecoveryHandle
{
39 list
<ECBackend::RecoveryOp
> ops
;
42 ostream
&operator<<(ostream
&lhs
, const ECBackend::pipeline_state_t
&rhs
) {
43 switch (rhs
.pipeline_state
) {
44 case ECBackend::pipeline_state_t::CACHE_VALID
:
45 return lhs
<< "CACHE_VALID";
46 case ECBackend::pipeline_state_t::CACHE_INVALID
:
47 return lhs
<< "CACHE_INVALID";
49 ceph_abort_msg("invalid pipeline state");
51 return lhs
; // unreachable
54 static ostream
&operator<<(ostream
&lhs
, const map
<pg_shard_t
, bufferlist
> &rhs
)
57 for (map
<pg_shard_t
, bufferlist
>::const_iterator i
= rhs
.begin();
62 lhs
<< make_pair(i
->first
, i
->second
.length());
67 static ostream
&operator<<(ostream
&lhs
, const map
<int, bufferlist
> &rhs
)
70 for (map
<int, bufferlist
>::const_iterator i
= rhs
.begin();
75 lhs
<< make_pair(i
->first
, i
->second
.length());
80 static ostream
&operator<<(
82 const boost::tuple
<uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > &rhs
)
84 return lhs
<< "(" << rhs
.get
<0>() << ", "
85 << rhs
.get
<1>() << ", " << rhs
.get
<2>() << ")";
88 ostream
&operator<<(ostream
&lhs
, const ECBackend::read_request_t
&rhs
)
90 return lhs
<< "read_request_t(to_read=[" << rhs
.to_read
<< "]"
91 << ", need=" << rhs
.need
92 << ", want_attrs=" << rhs
.want_attrs
96 ostream
&operator<<(ostream
&lhs
, const ECBackend::read_result_t
&rhs
)
98 lhs
<< "read_result_t(r=" << rhs
.r
99 << ", errors=" << rhs
.errors
;
101 lhs
<< ", attrs=" << *(rhs
.attrs
);
105 return lhs
<< ", returned=" << rhs
.returned
<< ")";
108 ostream
&operator<<(ostream
&lhs
, const ECBackend::ReadOp
&rhs
)
110 lhs
<< "ReadOp(tid=" << rhs
.tid
;
111 if (rhs
.op
&& rhs
.op
->get_req()) {
113 rhs
.op
->get_req()->print(lhs
);
115 return lhs
<< ", to_read=" << rhs
.to_read
116 << ", complete=" << rhs
.complete
117 << ", priority=" << rhs
.priority
118 << ", obj_to_source=" << rhs
.obj_to_source
119 << ", source_to_obj=" << rhs
.source_to_obj
120 << ", in_progress=" << rhs
.in_progress
<< ")";
123 void ECBackend::ReadOp::dump(Formatter
*f
) const
125 f
->dump_unsigned("tid", tid
);
126 if (op
&& op
->get_req()) {
127 f
->dump_stream("op") << *(op
->get_req());
129 f
->dump_stream("to_read") << to_read
;
130 f
->dump_stream("complete") << complete
;
131 f
->dump_int("priority", priority
);
132 f
->dump_stream("obj_to_source") << obj_to_source
;
133 f
->dump_stream("source_to_obj") << source_to_obj
;
134 f
->dump_stream("in_progress") << in_progress
;
137 ostream
&operator<<(ostream
&lhs
, const ECBackend::Op
&rhs
)
139 lhs
<< "Op(" << rhs
.hoid
140 << " v=" << rhs
.version
141 << " tt=" << rhs
.trim_to
142 << " tid=" << rhs
.tid
143 << " reqid=" << rhs
.reqid
;
144 if (rhs
.client_op
&& rhs
.client_op
->get_req()) {
145 lhs
<< " client_op=";
146 rhs
.client_op
->get_req()->print(lhs
);
148 lhs
<< " roll_forward_to=" << rhs
.roll_forward_to
149 << " temp_added=" << rhs
.temp_added
150 << " temp_cleared=" << rhs
.temp_cleared
151 << " pending_read=" << rhs
.pending_read
152 << " remote_read=" << rhs
.remote_read
153 << " remote_read_result=" << rhs
.remote_read_result
154 << " pending_apply=" << rhs
.pending_apply
155 << " pending_commit=" << rhs
.pending_commit
156 << " plan.to_read=" << rhs
.plan
.to_read
157 << " plan.will_write=" << rhs
.plan
.will_write
162 ostream
&operator<<(ostream
&lhs
, const ECBackend::RecoveryOp
&rhs
)
164 return lhs
<< "RecoveryOp("
165 << "hoid=" << rhs
.hoid
167 << " missing_on=" << rhs
.missing_on
168 << " missing_on_shards=" << rhs
.missing_on_shards
169 << " recovery_info=" << rhs
.recovery_info
170 << " recovery_progress=" << rhs
.recovery_progress
171 << " obc refcount=" << rhs
.obc
.use_count()
172 << " state=" << ECBackend::RecoveryOp::tostr(rhs
.state
)
173 << " waiting_on_pushes=" << rhs
.waiting_on_pushes
174 << " extent_requested=" << rhs
.extent_requested
178 void ECBackend::RecoveryOp::dump(Formatter
*f
) const
180 f
->dump_stream("hoid") << hoid
;
181 f
->dump_stream("v") << v
;
182 f
->dump_stream("missing_on") << missing_on
;
183 f
->dump_stream("missing_on_shards") << missing_on_shards
;
184 f
->dump_stream("recovery_info") << recovery_info
;
185 f
->dump_stream("recovery_progress") << recovery_progress
;
186 f
->dump_stream("state") << tostr(state
);
187 f
->dump_stream("waiting_on_pushes") << waiting_on_pushes
;
188 f
->dump_stream("extent_requested") << extent_requested
;
191 ECBackend::ECBackend(
192 PGBackend::Listener
*pg
,
194 ObjectStore::CollectionHandle
&ch
,
197 ErasureCodeInterfaceRef ec_impl
,
198 uint64_t stripe_width
)
199 : PGBackend(cct
, pg
, store
, coll
, ch
),
201 sinfo(ec_impl
->get_data_chunk_count(), stripe_width
) {
202 ceph_assert((ec_impl
->get_data_chunk_count() *
203 ec_impl
->get_chunk_size(stripe_width
)) == stripe_width
);
206 PGBackend::RecoveryHandle
*ECBackend::open_recovery_op()
208 return new ECRecoveryHandle
;
211 void ECBackend::_failed_push(const hobject_t
&hoid
,
212 pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
)
214 ECBackend::read_result_t
&res
= in
.second
;
215 dout(10) << __func__
<< ": Read error " << hoid
<< " r="
216 << res
.r
<< " errors=" << res
.errors
<< dendl
;
217 dout(10) << __func__
<< ": canceling recovery op for obj " << hoid
219 ceph_assert(recovery_ops
.count(hoid
));
220 eversion_t v
= recovery_ops
[hoid
].v
;
221 recovery_ops
.erase(hoid
);
224 for (auto&& i
: res
.errors
) {
227 get_parent()->on_failed_pull(fl
, hoid
, v
);
230 struct OnRecoveryReadComplete
:
231 public GenContext
<pair
<RecoveryMessages
*, ECBackend::read_result_t
& > &> {
234 OnRecoveryReadComplete(ECBackend
*pg
, const hobject_t
&hoid
)
235 : pg(pg
), hoid(hoid
) {}
236 void finish(pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
) override
{
237 ECBackend::read_result_t
&res
= in
.second
;
238 if (!(res
.r
== 0 && res
.errors
.empty())) {
239 pg
->_failed_push(hoid
, in
);
242 ceph_assert(res
.returned
.size() == 1);
243 pg
->handle_recovery_read_complete(
251 struct RecoveryMessages
{
253 ECBackend::read_request_t
> reads
;
254 map
<hobject_t
, set
<int>> want_to_read
;
257 const hobject_t
&hoid
, uint64_t off
, uint64_t len
,
258 set
<int> &&_want_to_read
,
259 const map
<pg_shard_t
, vector
<pair
<int, int>>> &need
,
261 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > to_read
;
262 to_read
.push_back(boost::make_tuple(off
, len
, 0));
263 ceph_assert(!reads
.count(hoid
));
264 want_to_read
.insert(make_pair(hoid
, std::move(_want_to_read
)));
268 ECBackend::read_request_t(
272 new OnRecoveryReadComplete(
277 map
<pg_shard_t
, vector
<PushOp
> > pushes
;
278 map
<pg_shard_t
, vector
<PushReplyOp
> > push_replies
;
279 ObjectStore::Transaction t
;
280 RecoveryMessages() {}
281 ~RecoveryMessages(){}
284 void ECBackend::handle_recovery_push(
289 if (get_parent()->check_failsafe_full()) {
290 dout(10) << __func__
<< " Out of space (failsafe) processing push request." << dendl
;
294 bool oneshot
= op
.before_progress
.first
&& op
.after_progress
.data_complete
;
297 tobj
= ghobject_t(op
.soid
, ghobject_t::NO_GEN
,
298 get_parent()->whoami_shard().shard
);
300 tobj
= ghobject_t(get_parent()->get_temp_recovery_object(op
.soid
,
303 get_parent()->whoami_shard().shard
);
304 if (op
.before_progress
.first
) {
305 dout(10) << __func__
<< ": Adding oid "
306 << tobj
.hobj
<< " in the temp collection" << dendl
;
307 add_temp_obj(tobj
.hobj
);
311 if (op
.before_progress
.first
) {
312 m
->t
.remove(coll
, tobj
);
313 m
->t
.touch(coll
, tobj
);
316 if (!op
.data_included
.empty()) {
317 uint64_t start
= op
.data_included
.range_start();
318 uint64_t end
= op
.data_included
.range_end();
319 ceph_assert(op
.data
.length() == (end
- start
));
328 ceph_assert(op
.data
.length() == 0);
331 if (get_parent()->pg_is_remote_backfilling()) {
332 get_parent()->pg_add_local_num_bytes(op
.data
.length());
333 get_parent()->pg_add_num_bytes(op
.data
.length() * get_ec_data_chunk_count());
334 dout(10) << __func__
<< " " << op
.soid
335 << " add new actual data by " << op
.data
.length()
336 << " add new num_bytes by " << op
.data
.length() * get_ec_data_chunk_count()
340 if (op
.before_progress
.first
) {
341 ceph_assert(op
.attrset
.count(string("_")));
348 if (op
.after_progress
.data_complete
&& !oneshot
) {
349 dout(10) << __func__
<< ": Removing oid "
350 << tobj
.hobj
<< " from the temp collection" << dendl
;
351 clear_temp_obj(tobj
.hobj
);
352 m
->t
.remove(coll
, ghobject_t(
353 op
.soid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
354 m
->t
.collection_move_rename(
357 op
.soid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
));
359 if (op
.after_progress
.data_complete
) {
360 if ((get_parent()->pgb_is_primary())) {
361 ceph_assert(recovery_ops
.count(op
.soid
));
362 ceph_assert(recovery_ops
[op
.soid
].obc
);
363 if (get_parent()->pg_is_repair())
364 get_parent()->inc_osd_stat_repaired();
365 get_parent()->on_local_recover(
368 recovery_ops
[op
.soid
].obc
,
372 // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired
374 get_parent()->inc_osd_stat_repaired();
375 get_parent()->on_local_recover(
381 if (get_parent()->pg_is_remote_backfilling()) {
383 int r
= store
->stat(ch
, ghobject_t(op
.soid
, ghobject_t::NO_GEN
,
384 get_parent()->whoami_shard().shard
), &st
);
386 get_parent()->pg_sub_local_num_bytes(st
.st_size
);
387 // XXX: This can be way overestimated for small objects
388 get_parent()->pg_sub_num_bytes(st
.st_size
* get_ec_data_chunk_count());
389 dout(10) << __func__
<< " " << op
.soid
390 << " sub actual data by " << st
.st_size
391 << " sub num_bytes by " << st
.st_size
* get_ec_data_chunk_count()
397 m
->push_replies
[get_parent()->primary_shard()].push_back(PushReplyOp());
398 m
->push_replies
[get_parent()->primary_shard()].back().soid
= op
.soid
;
401 void ECBackend::handle_recovery_push_reply(
402 const PushReplyOp
&op
,
406 if (!recovery_ops
.count(op
.soid
))
408 RecoveryOp
&rop
= recovery_ops
[op
.soid
];
409 ceph_assert(rop
.waiting_on_pushes
.count(from
));
410 rop
.waiting_on_pushes
.erase(from
);
411 continue_recovery_op(rop
, m
);
414 void ECBackend::handle_recovery_read_complete(
415 const hobject_t
&hoid
,
416 boost::tuple
<uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > &to_read
,
417 std::optional
<map
<string
, bufferlist
> > attrs
,
420 dout(10) << __func__
<< ": returned " << hoid
<< " "
421 << "(" << to_read
.get
<0>()
422 << ", " << to_read
.get
<1>()
423 << ", " << to_read
.get
<2>()
426 ceph_assert(recovery_ops
.count(hoid
));
427 RecoveryOp
&op
= recovery_ops
[hoid
];
428 ceph_assert(op
.returned_data
.empty());
429 map
<int, bufferlist
*> target
;
430 for (set
<shard_id_t
>::iterator i
= op
.missing_on_shards
.begin();
431 i
!= op
.missing_on_shards
.end();
433 target
[*i
] = &(op
.returned_data
[*i
]);
435 map
<int, bufferlist
> from
;
436 for(map
<pg_shard_t
, bufferlist
>::iterator i
= to_read
.get
<2>().begin();
437 i
!= to_read
.get
<2>().end();
439 from
[i
->first
.shard
].claim(i
->second
);
441 dout(10) << __func__
<< ": " << from
<< dendl
;
443 r
= ECUtil::decode(sinfo
, ec_impl
, from
, target
);
446 op
.xattrs
.swap(*attrs
);
449 // attrs only reference the origin bufferlist (decode from
450 // ECSubReadReply message) whose size is much greater than attrs
451 // in recovery. If obc cache it (get_obc maybe cache the attr),
452 // this causes the whole origin bufferlist would not be free
453 // until obc is evicted from obc cache. So rebuild the
454 // bufferlist before cache it.
455 for (map
<string
, bufferlist
>::iterator it
= op
.xattrs
.begin();
456 it
!= op
.xattrs
.end();
458 it
->second
.rebuild();
460 // Need to remove ECUtil::get_hinfo_key() since it should not leak out
461 // of the backend (see bug #12983)
462 map
<string
, bufferlist
> sanitized_attrs(op
.xattrs
);
463 sanitized_attrs
.erase(ECUtil::get_hinfo_key());
464 op
.obc
= get_parent()->get_obc(hoid
, sanitized_attrs
);
466 op
.recovery_info
.size
= op
.obc
->obs
.oi
.size
;
467 op
.recovery_info
.oi
= op
.obc
->obs
.oi
;
470 ECUtil::HashInfo
hinfo(ec_impl
->get_chunk_count());
471 if (op
.obc
->obs
.oi
.size
> 0) {
472 ceph_assert(op
.xattrs
.count(ECUtil::get_hinfo_key()));
473 auto bp
= op
.xattrs
[ECUtil::get_hinfo_key()].cbegin();
476 op
.hinfo
= unstable_hashinfo_registry
.lookup_or_create(hoid
, hinfo
);
478 ceph_assert(op
.xattrs
.size());
480 continue_recovery_op(op
, m
);
483 struct SendPushReplies
: public Context
{
484 PGBackend::Listener
*l
;
486 map
<int, MOSDPGPushReply
*> replies
;
488 PGBackend::Listener
*l
,
490 map
<int, MOSDPGPushReply
*> &in
) : l(l
), epoch(epoch
) {
493 void finish(int) override
{
494 std::vector
<std::pair
<int, Message
*>> messages
;
495 messages
.reserve(replies
.size());
496 for (map
<int, MOSDPGPushReply
*>::iterator i
= replies
.begin();
499 messages
.push_back(std::make_pair(i
->first
, i
->second
));
501 if (!messages
.empty()) {
502 l
->send_message_osd_cluster(messages
, epoch
);
506 ~SendPushReplies() override
{
507 for (map
<int, MOSDPGPushReply
*>::iterator i
= replies
.begin();
516 void ECBackend::dispatch_recovery_messages(RecoveryMessages
&m
, int priority
)
518 for (map
<pg_shard_t
, vector
<PushOp
> >::iterator i
= m
.pushes
.begin();
520 m
.pushes
.erase(i
++)) {
521 MOSDPGPush
*msg
= new MOSDPGPush();
522 msg
->set_priority(priority
);
523 msg
->map_epoch
= get_osdmap_epoch();
524 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
525 msg
->from
= get_parent()->whoami_shard();
526 msg
->pgid
= spg_t(get_parent()->get_info().pgid
.pgid
, i
->first
.shard
);
527 msg
->pushes
.swap(i
->second
);
528 msg
->compute_cost(cct
);
529 msg
->is_repair
= get_parent()->pg_is_repair();
530 get_parent()->send_message(
534 map
<int, MOSDPGPushReply
*> replies
;
535 for (map
<pg_shard_t
, vector
<PushReplyOp
> >::iterator i
=
536 m
.push_replies
.begin();
537 i
!= m
.push_replies
.end();
538 m
.push_replies
.erase(i
++)) {
539 MOSDPGPushReply
*msg
= new MOSDPGPushReply();
540 msg
->set_priority(priority
);
541 msg
->map_epoch
= get_osdmap_epoch();
542 msg
->min_epoch
= get_parent()->get_last_peering_reset_epoch();
543 msg
->from
= get_parent()->whoami_shard();
544 msg
->pgid
= spg_t(get_parent()->get_info().pgid
.pgid
, i
->first
.shard
);
545 msg
->replies
.swap(i
->second
);
546 msg
->compute_cost(cct
);
547 replies
.insert(make_pair(i
->first
.osd
, msg
));
550 if (!replies
.empty()) {
551 (m
.t
).register_on_complete(
552 get_parent()->bless_context(
557 get_parent()->queue_transaction(std::move(m
.t
));
570 void ECBackend::continue_recovery_op(
574 dout(10) << __func__
<< ": continuing " << op
<< dendl
;
577 case RecoveryOp::IDLE
: {
579 op
.state
= RecoveryOp::READING
;
580 ceph_assert(!op
.recovery_progress
.data_complete
);
581 set
<int> want(op
.missing_on_shards
.begin(), op
.missing_on_shards
.end());
582 uint64_t from
= op
.recovery_progress
.data_recovered_to
;
583 uint64_t amount
= get_recovery_chunk_size();
585 if (op
.recovery_progress
.first
&& op
.obc
) {
586 /* We've got the attrs and the hinfo, might as well use them */
587 op
.hinfo
= get_hash_info(op
.hoid
);
588 ceph_assert(op
.hinfo
);
589 op
.xattrs
= op
.obc
->attr_cache
;
590 encode(*(op
.hinfo
), op
.xattrs
[ECUtil::get_hinfo_key()]);
593 map
<pg_shard_t
, vector
<pair
<int, int>>> to_read
;
594 int r
= get_min_avail_to_read_shards(
595 op
.hoid
, want
, true, false, &to_read
);
597 // we must have lost a recovery source
598 ceph_assert(!op
.recovery_progress
.first
);
599 dout(10) << __func__
<< ": canceling recovery op for obj " << op
.hoid
601 get_parent()->cancel_pull(op
.hoid
);
602 recovery_ops
.erase(op
.hoid
);
608 op
.recovery_progress
.data_recovered_to
,
612 op
.recovery_progress
.first
&& !op
.obc
);
613 op
.extent_requested
= make_pair(
616 dout(10) << __func__
<< ": IDLE return " << op
<< dendl
;
619 case RecoveryOp::READING
: {
620 // read completed, start write
621 ceph_assert(op
.xattrs
.size());
622 ceph_assert(op
.returned_data
.size());
623 op
.state
= RecoveryOp::WRITING
;
624 ObjectRecoveryProgress after_progress
= op
.recovery_progress
;
625 after_progress
.data_recovered_to
+= op
.extent_requested
.second
;
626 after_progress
.first
= false;
627 if (after_progress
.data_recovered_to
>= op
.obc
->obs
.oi
.size
) {
628 after_progress
.data_recovered_to
=
629 sinfo
.logical_to_next_stripe_offset(
630 op
.obc
->obs
.oi
.size
);
631 after_progress
.data_complete
= true;
633 for (set
<pg_shard_t
>::iterator mi
= op
.missing_on
.begin();
634 mi
!= op
.missing_on
.end();
636 ceph_assert(op
.returned_data
.count(mi
->shard
));
637 m
->pushes
[*mi
].push_back(PushOp());
638 PushOp
&pop
= m
->pushes
[*mi
].back();
641 pop
.data
= op
.returned_data
[mi
->shard
];
642 dout(10) << __func__
<< ": before_progress=" << op
.recovery_progress
643 << ", after_progress=" << after_progress
644 << ", pop.data.length()=" << pop
.data
.length()
645 << ", size=" << op
.obc
->obs
.oi
.size
<< dendl
;
648 sinfo
.aligned_logical_offset_to_chunk_offset(
649 after_progress
.data_recovered_to
-
650 op
.recovery_progress
.data_recovered_to
)
652 if (pop
.data
.length())
653 pop
.data_included
.insert(
654 sinfo
.aligned_logical_offset_to_chunk_offset(
655 op
.recovery_progress
.data_recovered_to
),
658 if (op
.recovery_progress
.first
) {
659 pop
.attrset
= op
.xattrs
;
661 pop
.recovery_info
= op
.recovery_info
;
662 pop
.before_progress
= op
.recovery_progress
;
663 pop
.after_progress
= after_progress
;
664 if (*mi
!= get_parent()->primary_shard())
665 get_parent()->begin_peer_recover(
669 op
.returned_data
.clear();
670 op
.waiting_on_pushes
= op
.missing_on
;
671 op
.recovery_progress
= after_progress
;
672 dout(10) << __func__
<< ": READING return " << op
<< dendl
;
675 case RecoveryOp::WRITING
: {
676 if (op
.waiting_on_pushes
.empty()) {
677 if (op
.recovery_progress
.data_complete
) {
678 op
.state
= RecoveryOp::COMPLETE
;
679 for (set
<pg_shard_t
>::iterator i
= op
.missing_on
.begin();
680 i
!= op
.missing_on
.end();
682 if (*i
!= get_parent()->primary_shard()) {
683 dout(10) << __func__
<< ": on_peer_recover on " << *i
684 << ", obj " << op
.hoid
<< dendl
;
685 get_parent()->on_peer_recover(
691 object_stat_sum_t stat
;
692 stat
.num_bytes_recovered
= op
.recovery_info
.size
;
693 stat
.num_keys_recovered
= 0; // ??? op ... omap_entries.size(); ?
694 stat
.num_objects_recovered
= 1;
695 if (get_parent()->pg_is_repair())
696 stat
.num_objects_repaired
= 1;
697 get_parent()->on_global_recover(op
.hoid
, stat
, false);
698 dout(10) << __func__
<< ": WRITING return " << op
<< dendl
;
699 recovery_ops
.erase(op
.hoid
);
702 op
.state
= RecoveryOp::IDLE
;
703 dout(10) << __func__
<< ": WRITING continue " << op
<< dendl
;
709 // should never be called once complete
710 case RecoveryOp::COMPLETE
:
718 void ECBackend::run_recovery_op(
722 ECRecoveryHandle
*h
= static_cast<ECRecoveryHandle
*>(_h
);
724 for (list
<RecoveryOp
>::iterator i
= h
->ops
.begin();
727 dout(10) << __func__
<< ": starting " << *i
<< dendl
;
728 ceph_assert(!recovery_ops
.count(i
->hoid
));
729 RecoveryOp
&op
= recovery_ops
.insert(make_pair(i
->hoid
, *i
)).first
->second
;
730 continue_recovery_op(op
, &m
);
733 dispatch_recovery_messages(m
, priority
);
734 send_recovery_deletes(priority
, h
->deletes
);
738 int ECBackend::recover_object(
739 const hobject_t
&hoid
,
741 ObjectContextRef head
,
742 ObjectContextRef obc
,
745 ECRecoveryHandle
*h
= static_cast<ECRecoveryHandle
*>(_h
);
746 h
->ops
.push_back(RecoveryOp());
748 h
->ops
.back().hoid
= hoid
;
749 h
->ops
.back().obc
= obc
;
750 h
->ops
.back().recovery_info
.soid
= hoid
;
751 h
->ops
.back().recovery_info
.version
= v
;
753 h
->ops
.back().recovery_info
.size
= obc
->obs
.oi
.size
;
754 h
->ops
.back().recovery_info
.oi
= obc
->obs
.oi
;
756 if (hoid
.is_snap()) {
758 ceph_assert(obc
->ssc
);
759 h
->ops
.back().recovery_info
.ss
= obc
->ssc
->snapset
;
761 ceph_assert(head
->ssc
);
762 h
->ops
.back().recovery_info
.ss
= head
->ssc
->snapset
;
764 ceph_abort_msg("neither obc nor head set for a snap object");
767 h
->ops
.back().recovery_progress
.omap_complete
= true;
768 for (set
<pg_shard_t
>::const_iterator i
=
769 get_parent()->get_acting_recovery_backfill_shards().begin();
770 i
!= get_parent()->get_acting_recovery_backfill_shards().end();
772 dout(10) << "checking " << *i
<< dendl
;
773 if (get_parent()->get_shard_missing(*i
).is_missing(hoid
)) {
774 h
->ops
.back().missing_on
.insert(*i
);
775 h
->ops
.back().missing_on_shards
.insert(i
->shard
);
778 dout(10) << __func__
<< ": built op " << h
->ops
.back() << dendl
;
782 bool ECBackend::can_handle_while_inactive(
788 bool ECBackend::_handle_message(
791 dout(10) << __func__
<< ": " << *_op
->get_req() << dendl
;
792 int priority
= _op
->get_req()->get_priority();
793 switch (_op
->get_req()->get_type()) {
794 case MSG_OSD_EC_WRITE
: {
795 // NOTE: this is non-const because handle_sub_write modifies the embedded
796 // ObjectStore::Transaction in place (and then std::move's it). It does
797 // not conflict with ECSubWrite's operator<<.
798 MOSDECSubOpWrite
*op
= static_cast<MOSDECSubOpWrite
*>(
799 _op
->get_nonconst_req());
800 parent
->maybe_preempt_replica_scrub(op
->op
.soid
);
801 handle_sub_write(op
->op
.from
, _op
, op
->op
, _op
->pg_trace
);
804 case MSG_OSD_EC_WRITE_REPLY
: {
805 const MOSDECSubOpWriteReply
*op
= static_cast<const MOSDECSubOpWriteReply
*>(
807 handle_sub_write_reply(op
->op
.from
, op
->op
, _op
->pg_trace
);
810 case MSG_OSD_EC_READ
: {
811 auto op
= _op
->get_req
<MOSDECSubOpRead
>();
812 MOSDECSubOpReadReply
*reply
= new MOSDECSubOpReadReply
;
813 reply
->pgid
= get_parent()->primary_spg_t();
814 reply
->map_epoch
= get_osdmap_epoch();
815 reply
->min_epoch
= get_parent()->get_interval_start_epoch();
816 handle_sub_read(op
->op
.from
, op
->op
, &(reply
->op
), _op
->pg_trace
);
817 reply
->trace
= _op
->pg_trace
;
818 get_parent()->send_message_osd_cluster(
819 reply
, _op
->get_req()->get_connection());
822 case MSG_OSD_EC_READ_REPLY
: {
823 // NOTE: this is non-const because handle_sub_read_reply steals resulting
824 // buffers. It does not conflict with ECSubReadReply operator<<.
825 MOSDECSubOpReadReply
*op
= static_cast<MOSDECSubOpReadReply
*>(
826 _op
->get_nonconst_req());
828 handle_sub_read_reply(op
->op
.from
, op
->op
, &rm
, _op
->pg_trace
);
829 dispatch_recovery_messages(rm
, priority
);
832 case MSG_OSD_PG_PUSH
: {
833 auto op
= _op
->get_req
<MOSDPGPush
>();
835 for (vector
<PushOp
>::const_iterator i
= op
->pushes
.begin();
836 i
!= op
->pushes
.end();
838 handle_recovery_push(*i
, &rm
, op
->is_repair
);
840 dispatch_recovery_messages(rm
, priority
);
843 case MSG_OSD_PG_PUSH_REPLY
: {
844 const MOSDPGPushReply
*op
= static_cast<const MOSDPGPushReply
*>(
847 for (vector
<PushReplyOp
>::const_iterator i
= op
->replies
.begin();
848 i
!= op
->replies
.end();
850 handle_recovery_push_reply(*i
, op
->from
, &rm
);
852 dispatch_recovery_messages(rm
, priority
);
861 struct SubWriteCommitted
: public Context
{
866 eversion_t last_complete
;
867 const ZTracer::Trace trace
;
873 eversion_t last_complete
,
874 const ZTracer::Trace
&trace
)
875 : pg(pg
), msg(msg
), tid(tid
),
876 version(version
), last_complete(last_complete
), trace(trace
) {}
877 void finish(int) override
{
879 msg
->mark_event("sub_op_committed");
880 pg
->sub_write_committed(tid
, version
, last_complete
, trace
);
883 void ECBackend::sub_write_committed(
884 ceph_tid_t tid
, eversion_t version
, eversion_t last_complete
,
885 const ZTracer::Trace
&trace
) {
886 if (get_parent()->pgb_is_primary()) {
887 ECSubWriteReply reply
;
889 reply
.last_complete
= last_complete
;
890 reply
.committed
= true;
891 reply
.applied
= true;
892 reply
.from
= get_parent()->whoami_shard();
893 handle_sub_write_reply(
894 get_parent()->whoami_shard(),
897 get_parent()->update_last_complete_ondisk(last_complete
);
898 MOSDECSubOpWriteReply
*r
= new MOSDECSubOpWriteReply
;
899 r
->pgid
= get_parent()->primary_spg_t();
900 r
->map_epoch
= get_osdmap_epoch();
901 r
->min_epoch
= get_parent()->get_interval_start_epoch();
903 r
->op
.last_complete
= last_complete
;
904 r
->op
.committed
= true;
905 r
->op
.applied
= true;
906 r
->op
.from
= get_parent()->whoami_shard();
907 r
->set_priority(CEPH_MSG_PRIO_HIGH
);
909 r
->trace
.event("sending sub op commit");
910 get_parent()->send_message_osd_cluster(
911 get_parent()->primary_shard().osd
, r
, get_osdmap_epoch());
915 void ECBackend::handle_sub_write(
919 const ZTracer::Trace
&trace
)
922 msg
->mark_event("sub_op_started");
923 trace
.event("handle_sub_write");
924 if (!get_parent()->pgb_is_primary())
925 get_parent()->update_stats(op
.stats
);
926 ObjectStore::Transaction localt
;
927 if (!op
.temp_added
.empty()) {
928 add_temp_objs(op
.temp_added
);
930 if (op
.backfill_or_async_recovery
) {
931 for (set
<hobject_t
>::iterator i
= op
.temp_removed
.begin();
932 i
!= op
.temp_removed
.end();
934 dout(10) << __func__
<< ": removing object " << *i
935 << " since we won't get the transaction" << dendl
;
941 get_parent()->whoami_shard().shard
));
944 clear_temp_objs(op
.temp_removed
);
945 dout(30) << __func__
<< " missing before " << get_parent()->get_log().get_missing().get_items() << dendl
;
946 // flag set to true during async recovery
948 pg_missing_tracker_t pmissing
= get_parent()->get_local_missing();
949 if (pmissing
.is_missing(op
.soid
)) {
951 dout(30) << __func__
<< " is_missing " << pmissing
.is_missing(op
.soid
) << dendl
;
952 for (auto &&e
: op
.log_entries
) {
953 dout(30) << " add_next_event entry " << e
<< dendl
;
954 get_parent()->add_local_next_event(e
);
955 dout(30) << " entry is_delete " << e
.is_delete() << dendl
;
958 get_parent()->log_operation(
960 op
.updated_hit_set_history
,
964 !op
.backfill_or_async_recovery
,
968 if (!get_parent()->pg_is_undersized() &&
969 (unsigned)get_parent()->whoami_shard().shard
>=
970 ec_impl
->get_data_chunk_count())
971 op
.t
.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
973 localt
.register_on_commit(
974 get_parent()->bless_context(
975 new SubWriteCommitted(
978 get_parent()->get_info().last_complete
, trace
)));
979 vector
<ObjectStore::Transaction
> tls
;
981 tls
.push_back(std::move(op
.t
));
982 tls
.push_back(std::move(localt
));
983 get_parent()->queue_transactions(tls
, msg
);
984 dout(30) << __func__
<< " missing after" << get_parent()->get_log().get_missing().get_items() << dendl
;
985 if (op
.at_version
!= eversion_t()) {
986 // dummy rollforward transaction doesn't get at_version (and doesn't advance it)
987 get_parent()->op_applied(op
.at_version
);
991 void ECBackend::handle_sub_read(
994 ECSubReadReply
*reply
,
995 const ZTracer::Trace
&trace
)
997 trace
.event("handle sub read");
998 shard_id_t shard
= get_parent()->whoami_shard().shard
;
999 for(auto i
= op
.to_read
.begin();
1000 i
!= op
.to_read
.end();
1003 for (auto j
= i
->second
.begin(); j
!= i
->second
.end(); ++j
) {
1005 if ((op
.subchunks
.find(i
->first
)->second
.size() == 1) &&
1006 (op
.subchunks
.find(i
->first
)->second
.front().second
==
1007 ec_impl
->get_sub_chunk_count())) {
1008 dout(25) << __func__
<< " case1: reading the complete chunk/shard." << dendl
;
1011 ghobject_t(i
->first
, ghobject_t::NO_GEN
, shard
),
1014 bl
, j
->get
<2>()); // Allow EIO return
1016 dout(25) << __func__
<< " case2: going to do fragmented read." << dendl
;
1018 sinfo
.get_chunk_size() / ec_impl
->get_sub_chunk_count();
1020 for (int m
= 0; m
< (int)j
->get
<1>() && !error
;
1021 m
+= sinfo
.get_chunk_size()) {
1022 for (auto &&k
:op
.subchunks
.find(i
->first
)->second
) {
1026 ghobject_t(i
->first
, ghobject_t::NO_GEN
, shard
),
1027 j
->get
<0>() + m
+ (k
.first
)*subchunk_size
,
1028 (k
.second
)*subchunk_size
,
1034 bl
.claim_append(bl0
);
1040 // if we are doing fast reads, it's possible for one of the shard
1041 // reads to cross paths with another update and get a (harmless)
1042 // ENOENT. Suppress the message to the cluster log in that case.
1043 if (r
== -ENOENT
&& get_parent()->get_pool().fast_read
) {
1044 dout(5) << __func__
<< ": Error " << r
1045 << " reading " << i
->first
<< ", fast read, probably ok"
1048 get_parent()->clog_error() << "Error " << r
1049 << " reading object "
1051 dout(5) << __func__
<< ": Error " << r
1052 << " reading " << i
->first
<< dendl
;
1056 dout(20) << __func__
<< " read request=" << j
->get
<1>() << " r=" << r
<< " len=" << bl
.length() << dendl
;
1057 reply
->buffers_read
[i
->first
].push_back(
1064 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1065 // This shows that we still need deep scrub because large enough files
1066 // are read in sections, so the digest check here won't be done here.
1067 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1068 // the state of our chunk in case other chunks could substitute.
1069 ECUtil::HashInfoRef hinfo
;
1070 hinfo
= get_hash_info(i
->first
);
1073 get_parent()->clog_error() << "Corruption detected: object "
1075 << " is missing hash_info";
1076 dout(5) << __func__
<< ": No hinfo for " << i
->first
<< dendl
;
1079 ceph_assert(hinfo
->has_chunk_hash());
1080 if ((bl
.length() == hinfo
->get_total_chunk_size()) &&
1081 (j
->get
<0>() == 0)) {
1082 dout(20) << __func__
<< ": Checking hash of " << i
->first
<< dendl
;
1085 if (h
.digest() != hinfo
->get_chunk_hash(shard
)) {
1086 get_parent()->clog_error() << "Bad hash for " << i
->first
<< " digest 0x"
1087 << hex
<< h
.digest() << " expected 0x" << hinfo
->get_chunk_hash(shard
) << dec
;
1088 dout(5) << __func__
<< ": Bad hash for " << i
->first
<< " digest 0x"
1089 << hex
<< h
.digest() << " expected 0x" << hinfo
->get_chunk_hash(shard
) << dec
<< dendl
;
1098 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1099 // the state of our chunk in case other chunks could substitute.
1100 reply
->buffers_read
.erase(i
->first
);
1101 reply
->errors
[i
->first
] = r
;
1103 for (set
<hobject_t
>::iterator i
= op
.attrs_to_read
.begin();
1104 i
!= op
.attrs_to_read
.end();
1106 dout(10) << __func__
<< ": fulfilling attr request on "
1108 if (reply
->errors
.count(*i
))
1110 int r
= store
->getattrs(
1113 *i
, ghobject_t::NO_GEN
, shard
),
1114 reply
->attrs_read
[*i
]);
1116 // If we read error, we should not return the attrs too.
1117 reply
->attrs_read
.erase(*i
);
1118 reply
->buffers_read
.erase(*i
);
1119 reply
->errors
[*i
] = r
;
1122 reply
->from
= get_parent()->whoami_shard();
1123 reply
->tid
= op
.tid
;
1126 void ECBackend::handle_sub_write_reply(
1128 const ECSubWriteReply
&op
,
1129 const ZTracer::Trace
&trace
)
1131 map
<ceph_tid_t
, Op
>::iterator i
= tid_to_op_map
.find(op
.tid
);
1132 ceph_assert(i
!= tid_to_op_map
.end());
1134 trace
.event("sub write committed");
1135 ceph_assert(i
->second
.pending_commit
.count(from
));
1136 i
->second
.pending_commit
.erase(from
);
1137 if (from
!= get_parent()->whoami_shard()) {
1138 get_parent()->update_peer_last_complete_ondisk(from
, op
.last_complete
);
1142 trace
.event("sub write applied");
1143 ceph_assert(i
->second
.pending_apply
.count(from
));
1144 i
->second
.pending_apply
.erase(from
);
1147 if (i
->second
.pending_commit
.empty() &&
1148 i
->second
.on_all_commit
&&
1149 // also wait for apply, to preserve ordering with luminous peers.
1150 i
->second
.pending_apply
.empty()) {
1151 dout(10) << __func__
<< " Calling on_all_commit on " << i
->second
<< dendl
;
1152 i
->second
.on_all_commit
->complete(0);
1153 i
->second
.on_all_commit
= 0;
1154 i
->second
.trace
.event("ec write all committed");
1159 void ECBackend::handle_sub_read_reply(
1162 RecoveryMessages
*m
,
1163 const ZTracer::Trace
&trace
)
1165 trace
.event("ec sub read reply");
1166 dout(10) << __func__
<< ": reply " << op
<< dendl
;
1167 map
<ceph_tid_t
, ReadOp
>::iterator iter
= tid_to_read_map
.find(op
.tid
);
1168 if (iter
== tid_to_read_map
.end()) {
1170 dout(20) << __func__
<< ": dropped " << op
<< dendl
;
1173 ReadOp
&rop
= iter
->second
;
1174 for (auto i
= op
.buffers_read
.begin();
1175 i
!= op
.buffers_read
.end();
1177 ceph_assert(!op
.errors
.count(i
->first
)); // If attribute error we better not have sent a buffer
1178 if (!rop
.to_read
.count(i
->first
)) {
1179 // We canceled this read! @see filter_read_op
1180 dout(20) << __func__
<< " to_read skipping" << dendl
;
1183 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter
=
1184 rop
.to_read
.find(i
->first
)->second
.to_read
.begin();
1187 uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > >::iterator riter
=
1188 rop
.complete
[i
->first
].returned
.begin();
1189 for (list
<pair
<uint64_t, bufferlist
> >::iterator j
= i
->second
.begin();
1190 j
!= i
->second
.end();
1191 ++j
, ++req_iter
, ++riter
) {
1192 ceph_assert(req_iter
!= rop
.to_read
.find(i
->first
)->second
.to_read
.end());
1193 ceph_assert(riter
!= rop
.complete
[i
->first
].returned
.end());
1194 pair
<uint64_t, uint64_t> adjusted
=
1195 sinfo
.aligned_offset_len_to_chunk(
1196 make_pair(req_iter
->get
<0>(), req_iter
->get
<1>()));
1197 ceph_assert(adjusted
.first
== j
->first
);
1198 riter
->get
<2>()[from
].claim(j
->second
);
1201 for (auto i
= op
.attrs_read
.begin();
1202 i
!= op
.attrs_read
.end();
1204 ceph_assert(!op
.errors
.count(i
->first
)); // if read error better not have sent an attribute
1205 if (!rop
.to_read
.count(i
->first
)) {
1206 // We canceled this read! @see filter_read_op
1207 dout(20) << __func__
<< " to_read skipping" << dendl
;
1210 rop
.complete
[i
->first
].attrs
= map
<string
, bufferlist
>();
1211 (*(rop
.complete
[i
->first
].attrs
)).swap(i
->second
);
1213 for (auto i
= op
.errors
.begin();
1214 i
!= op
.errors
.end();
1216 rop
.complete
[i
->first
].errors
.insert(
1220 dout(20) << __func__
<< " shard=" << from
<< " error=" << i
->second
<< dendl
;
1223 map
<pg_shard_t
, set
<ceph_tid_t
> >::iterator siter
=
1224 shard_to_read_map
.find(from
);
1225 ceph_assert(siter
!= shard_to_read_map
.end());
1226 ceph_assert(siter
->second
.count(op
.tid
));
1227 siter
->second
.erase(op
.tid
);
1229 ceph_assert(rop
.in_progress
.count(from
));
1230 rop
.in_progress
.erase(from
);
1231 unsigned is_complete
= 0;
1232 // For redundant reads check for completion as each shard comes in,
1233 // or in a non-recovery read check for completion once all the shards read.
1234 if (rop
.do_redundant_reads
|| rop
.in_progress
.empty()) {
1235 for (map
<hobject_t
, read_result_t
>::const_iterator iter
=
1236 rop
.complete
.begin();
1237 iter
!= rop
.complete
.end();
1240 for (map
<pg_shard_t
, bufferlist
>::const_iterator j
=
1241 iter
->second
.returned
.front().get
<2>().begin();
1242 j
!= iter
->second
.returned
.front().get
<2>().end();
1244 have
.insert(j
->first
.shard
);
1245 dout(20) << __func__
<< " have shard=" << j
->first
.shard
<< dendl
;
1247 map
<int, vector
<pair
<int, int>>> dummy_minimum
;
1249 if ((err
= ec_impl
->minimum_to_decode(rop
.want_to_read
[iter
->first
], have
, &dummy_minimum
)) < 0) {
1250 dout(20) << __func__
<< " minimum_to_decode failed" << dendl
;
1251 if (rop
.in_progress
.empty()) {
1252 // If we don't have enough copies, try other pg_shard_ts if available.
1253 // During recovery there may be multiple osds with copies of the same shard,
1254 // so getting EIO from one may result in multiple passes through this code path.
1255 if (!rop
.do_redundant_reads
) {
1256 int r
= send_all_remaining_reads(iter
->first
, rop
);
1258 // We added to in_progress and not incrementing is_complete
1261 // Couldn't read any additional shards so handle as completed with errors
1263 // We don't want to confuse clients / RBD with objectstore error
1264 // values in particular ENOENT. We may have different error returns
1265 // from different shards, so we'll return minimum_to_decode() error
1266 // (usually EIO) to reader. It is likely an error here is due to a
1268 rop
.complete
[iter
->first
].r
= err
;
1272 ceph_assert(rop
.complete
[iter
->first
].r
== 0);
1273 if (!rop
.complete
[iter
->first
].errors
.empty()) {
1274 if (cct
->_conf
->osd_read_ec_check_for_errors
) {
1275 dout(10) << __func__
<< ": Not ignoring errors, use one shard err=" << err
<< dendl
;
1276 err
= rop
.complete
[iter
->first
].errors
.begin()->second
;
1277 rop
.complete
[iter
->first
].r
= err
;
1279 get_parent()->clog_warn() << "Error(s) ignored for "
1280 << iter
->first
<< " enough copies available";
1281 dout(10) << __func__
<< " Error(s) ignored for " << iter
->first
1282 << " enough copies available" << dendl
;
1283 rop
.complete
[iter
->first
].errors
.clear();
1290 if (rop
.in_progress
.empty() || is_complete
== rop
.complete
.size()) {
1291 dout(20) << __func__
<< " Complete: " << rop
<< dendl
;
1292 rop
.trace
.event("ec read complete");
1293 complete_read_op(rop
, m
);
1295 dout(10) << __func__
<< " readop not complete: " << rop
<< dendl
;
1299 void ECBackend::complete_read_op(ReadOp
&rop
, RecoveryMessages
*m
)
1301 map
<hobject_t
, read_request_t
>::iterator reqiter
=
1302 rop
.to_read
.begin();
1303 map
<hobject_t
, read_result_t
>::iterator resiter
=
1304 rop
.complete
.begin();
1305 ceph_assert(rop
.to_read
.size() == rop
.complete
.size());
1306 for (; reqiter
!= rop
.to_read
.end(); ++reqiter
, ++resiter
) {
1307 if (reqiter
->second
.cb
) {
1308 pair
<RecoveryMessages
*, read_result_t
&> arg(
1309 m
, resiter
->second
);
1310 reqiter
->second
.cb
->complete(arg
);
1311 reqiter
->second
.cb
= nullptr;
1314 // if the read op is over. clean all the data of this tid.
1315 for (set
<pg_shard_t
>::iterator iter
= rop
.in_progress
.begin();
1316 iter
!= rop
.in_progress
.end();
1318 shard_to_read_map
[*iter
].erase(rop
.tid
);
1320 rop
.in_progress
.clear();
1321 tid_to_read_map
.erase(rop
.tid
);
1324 struct FinishReadOp
: public GenContext
<ThreadPool::TPHandle
&> {
1327 FinishReadOp(ECBackend
*ec
, ceph_tid_t tid
) : ec(ec
), tid(tid
) {}
1328 void finish(ThreadPool::TPHandle
&handle
) override
{
1329 auto ropiter
= ec
->tid_to_read_map
.find(tid
);
1330 ceph_assert(ropiter
!= ec
->tid_to_read_map
.end());
1331 int priority
= ropiter
->second
.priority
;
1332 RecoveryMessages rm
;
1333 ec
->complete_read_op(ropiter
->second
, &rm
);
1334 ec
->dispatch_recovery_messages(rm
, priority
);
1338 void ECBackend::filter_read_op(
1339 const OSDMapRef
& osdmap
,
1342 set
<hobject_t
> to_cancel
;
1343 for (map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= op
.source_to_obj
.begin();
1344 i
!= op
.source_to_obj
.end();
1346 if (osdmap
->is_down(i
->first
.osd
)) {
1347 to_cancel
.insert(i
->second
.begin(), i
->second
.end());
1348 op
.in_progress
.erase(i
->first
);
1353 if (to_cancel
.empty())
1356 for (map
<pg_shard_t
, set
<hobject_t
> >::iterator i
= op
.source_to_obj
.begin();
1357 i
!= op
.source_to_obj
.end();
1359 for (set
<hobject_t
>::iterator j
= i
->second
.begin();
1360 j
!= i
->second
.end();
1362 if (to_cancel
.count(*j
))
1363 i
->second
.erase(j
++);
1367 if (i
->second
.empty()) {
1368 op
.source_to_obj
.erase(i
++);
1370 ceph_assert(!osdmap
->is_down(i
->first
.osd
));
1375 for (set
<hobject_t
>::iterator i
= to_cancel
.begin();
1376 i
!= to_cancel
.end();
1378 get_parent()->cancel_pull(*i
);
1380 ceph_assert(op
.to_read
.count(*i
));
1381 read_request_t
&req
= op
.to_read
.find(*i
)->second
;
1382 dout(10) << __func__
<< ": canceling " << req
1383 << " for obj " << *i
<< dendl
;
1384 ceph_assert(req
.cb
);
1388 op
.to_read
.erase(*i
);
1389 op
.complete
.erase(*i
);
1390 recovery_ops
.erase(*i
);
1393 if (op
.in_progress
.empty()) {
1394 get_parent()->schedule_recovery_work(
1395 get_parent()->bless_unlocked_gencontext(
1396 new FinishReadOp(this, op
.tid
)));
1400 void ECBackend::check_recovery_sources(const OSDMapRef
& osdmap
)
1402 set
<ceph_tid_t
> tids_to_filter
;
1403 for (map
<pg_shard_t
, set
<ceph_tid_t
> >::iterator
1404 i
= shard_to_read_map
.begin();
1405 i
!= shard_to_read_map
.end();
1407 if (osdmap
->is_down(i
->first
.osd
)) {
1408 tids_to_filter
.insert(i
->second
.begin(), i
->second
.end());
1409 shard_to_read_map
.erase(i
++);
1414 for (set
<ceph_tid_t
>::iterator i
= tids_to_filter
.begin();
1415 i
!= tids_to_filter
.end();
1417 map
<ceph_tid_t
, ReadOp
>::iterator j
= tid_to_read_map
.find(*i
);
1418 ceph_assert(j
!= tid_to_read_map
.end());
1419 filter_read_op(osdmap
, j
->second
);
1423 void ECBackend::on_change()
1425 dout(10) << __func__
<< dendl
;
1427 completed_to
= eversion_t();
1428 committed_to
= eversion_t();
1429 pipeline_state
.clear();
1430 waiting_reads
.clear();
1431 waiting_state
.clear();
1432 waiting_commit
.clear();
1433 for (auto &&op
: tid_to_op_map
) {
1434 cache
.release_write_pin(op
.second
.pin
);
1436 tid_to_op_map
.clear();
1438 for (map
<ceph_tid_t
, ReadOp
>::iterator i
= tid_to_read_map
.begin();
1439 i
!= tid_to_read_map
.end();
1441 dout(10) << __func__
<< ": cancelling " << i
->second
<< dendl
;
1442 for (map
<hobject_t
, read_request_t
>::iterator j
=
1443 i
->second
.to_read
.begin();
1444 j
!= i
->second
.to_read
.end();
1446 delete j
->second
.cb
;
1447 j
->second
.cb
= nullptr;
1450 tid_to_read_map
.clear();
1451 in_progress_client_reads
.clear();
1452 shard_to_read_map
.clear();
1453 clear_recovery_state();
1456 void ECBackend::clear_recovery_state()
1458 recovery_ops
.clear();
1461 void ECBackend::dump_recovery_info(Formatter
*f
) const
1463 f
->open_array_section("recovery_ops");
1464 for (map
<hobject_t
, RecoveryOp
>::const_iterator i
= recovery_ops
.begin();
1465 i
!= recovery_ops
.end();
1467 f
->open_object_section("op");
1472 f
->open_array_section("read_ops");
1473 for (map
<ceph_tid_t
, ReadOp
>::const_iterator i
= tid_to_read_map
.begin();
1474 i
!= tid_to_read_map
.end();
1476 f
->open_object_section("read_op");
1483 void ECBackend::submit_transaction(
1484 const hobject_t
&hoid
,
1485 const object_stat_sum_t
&delta_stats
,
1486 const eversion_t
&at_version
,
1487 PGTransactionUPtr
&&t
,
1488 const eversion_t
&trim_to
,
1489 const eversion_t
&min_last_complete_ondisk
,
1490 const vector
<pg_log_entry_t
> &log_entries
,
1491 std::optional
<pg_hit_set_history_t
> &hset_history
,
1492 Context
*on_all_commit
,
1495 OpRequestRef client_op
1498 ceph_assert(!tid_to_op_map
.count(tid
));
1499 Op
*op
= &(tid_to_op_map
[tid
]);
1501 op
->delta_stats
= delta_stats
;
1502 op
->version
= at_version
;
1503 op
->trim_to
= trim_to
;
1504 op
->roll_forward_to
= std::max(min_last_complete_ondisk
, committed_to
);
1505 op
->log_entries
= log_entries
;
1506 std::swap(op
->updated_hit_set_history
, hset_history
);
1507 op
->on_all_commit
= on_all_commit
;
1510 op
->client_op
= client_op
;
1512 op
->trace
= client_op
->pg_trace
;
1514 dout(10) << __func__
<< ": op " << *op
<< " starting" << dendl
;
1515 start_rmw(op
, std::move(t
));
1518 void ECBackend::call_write_ordered(std::function
<void(void)> &&cb
) {
1519 if (!waiting_state
.empty()) {
1520 waiting_state
.back().on_write
.emplace_back(std::move(cb
));
1521 } else if (!waiting_reads
.empty()) {
1522 waiting_reads
.back().on_write
.emplace_back(std::move(cb
));
1524 // Nothing earlier in the pipeline, just call it
1529 void ECBackend::get_all_avail_shards(
1530 const hobject_t
&hoid
,
1531 const set
<pg_shard_t
> &error_shards
,
1533 map
<shard_id_t
, pg_shard_t
> &shards
,
1536 for (set
<pg_shard_t
>::const_iterator i
=
1537 get_parent()->get_acting_shards().begin();
1538 i
!= get_parent()->get_acting_shards().end();
1540 dout(10) << __func__
<< ": checking acting " << *i
<< dendl
;
1541 const pg_missing_t
&missing
= get_parent()->get_shard_missing(*i
);
1542 if (error_shards
.find(*i
) != error_shards
.end())
1544 if (!missing
.is_missing(hoid
)) {
1545 ceph_assert(!have
.count(i
->shard
));
1546 have
.insert(i
->shard
);
1547 ceph_assert(!shards
.count(i
->shard
));
1548 shards
.insert(make_pair(i
->shard
, *i
));
1553 for (set
<pg_shard_t
>::const_iterator i
=
1554 get_parent()->get_backfill_shards().begin();
1555 i
!= get_parent()->get_backfill_shards().end();
1557 if (error_shards
.find(*i
) != error_shards
.end())
1559 if (have
.count(i
->shard
)) {
1560 ceph_assert(shards
.count(i
->shard
));
1563 dout(10) << __func__
<< ": checking backfill " << *i
<< dendl
;
1564 ceph_assert(!shards
.count(i
->shard
));
1565 const pg_info_t
&info
= get_parent()->get_shard_info(*i
);
1566 const pg_missing_t
&missing
= get_parent()->get_shard_missing(*i
);
1567 if (hoid
< info
.last_backfill
&&
1568 !missing
.is_missing(hoid
)) {
1569 have
.insert(i
->shard
);
1570 shards
.insert(make_pair(i
->shard
, *i
));
1574 map
<hobject_t
, set
<pg_shard_t
>>::const_iterator miter
=
1575 get_parent()->get_missing_loc_shards().find(hoid
);
1576 if (miter
!= get_parent()->get_missing_loc_shards().end()) {
1577 for (set
<pg_shard_t
>::iterator i
= miter
->second
.begin();
1578 i
!= miter
->second
.end();
1580 dout(10) << __func__
<< ": checking missing_loc " << *i
<< dendl
;
1581 auto m
= get_parent()->maybe_get_shard_missing(*i
);
1583 ceph_assert(!(*m
).is_missing(hoid
));
1585 if (error_shards
.find(*i
) != error_shards
.end())
1587 have
.insert(i
->shard
);
1588 shards
.insert(make_pair(i
->shard
, *i
));
1594 int ECBackend::get_min_avail_to_read_shards(
1595 const hobject_t
&hoid
,
1596 const set
<int> &want
,
1598 bool do_redundant_reads
,
1599 map
<pg_shard_t
, vector
<pair
<int, int>>> *to_read
)
1601 // Make sure we don't do redundant reads for recovery
1602 ceph_assert(!for_recovery
|| !do_redundant_reads
);
1605 map
<shard_id_t
, pg_shard_t
> shards
;
1606 set
<pg_shard_t
> error_shards
;
1608 get_all_avail_shards(hoid
, error_shards
, have
, shards
, for_recovery
);
1610 map
<int, vector
<pair
<int, int>>> need
;
1611 int r
= ec_impl
->minimum_to_decode(want
, have
, &need
);
1615 if (do_redundant_reads
) {
1616 vector
<pair
<int, int>> subchunks_list
;
1617 subchunks_list
.push_back(make_pair(0, ec_impl
->get_sub_chunk_count()));
1618 for (auto &&i
: have
) {
1619 need
[i
] = subchunks_list
;
1626 for (auto &&i
:need
) {
1627 ceph_assert(shards
.count(shard_id_t(i
.first
)));
1628 to_read
->insert(make_pair(shards
[shard_id_t(i
.first
)], i
.second
));
1633 int ECBackend::get_remaining_shards(
1634 const hobject_t
&hoid
,
1635 const set
<int> &avail
,
1636 const set
<int> &want
,
1637 const read_result_t
&result
,
1638 map
<pg_shard_t
, vector
<pair
<int, int>>> *to_read
,
1641 ceph_assert(to_read
);
1644 map
<shard_id_t
, pg_shard_t
> shards
;
1645 set
<pg_shard_t
> error_shards
;
1646 for (auto &p
: result
.errors
) {
1647 error_shards
.insert(p
.first
);
1650 get_all_avail_shards(hoid
, error_shards
, have
, shards
, for_recovery
);
1652 map
<int, vector
<pair
<int, int>>> need
;
1653 int r
= ec_impl
->minimum_to_decode(want
, have
, &need
);
1655 dout(0) << __func__
<< " not enough shards left to try for " << hoid
1656 << " read result was " << result
<< dendl
;
1660 set
<int> shards_left
;
1661 for (auto p
: need
) {
1662 if (avail
.find(p
.first
) == avail
.end()) {
1663 shards_left
.insert(p
.first
);
1667 vector
<pair
<int, int>> subchunks
;
1668 subchunks
.push_back(make_pair(0, ec_impl
->get_sub_chunk_count()));
1669 for (set
<int>::iterator i
= shards_left
.begin();
1670 i
!= shards_left
.end();
1672 ceph_assert(shards
.count(shard_id_t(*i
)));
1673 ceph_assert(avail
.find(*i
) == avail
.end());
1674 to_read
->insert(make_pair(shards
[shard_id_t(*i
)], subchunks
));
1679 void ECBackend::start_read_op(
1681 map
<hobject_t
, set
<int>> &want_to_read
,
1682 map
<hobject_t
, read_request_t
> &to_read
,
1684 bool do_redundant_reads
,
1687 ceph_tid_t tid
= get_parent()->get_tid();
1688 ceph_assert(!tid_to_read_map
.count(tid
));
1689 auto &op
= tid_to_read_map
.emplace(
1697 std::move(want_to_read
),
1698 std::move(to_read
))).first
->second
;
1699 dout(10) << __func__
<< ": starting " << op
<< dendl
;
1701 op
.trace
= _op
->pg_trace
;
1702 op
.trace
.event("start ec read");
1707 void ECBackend::do_read_op(ReadOp
&op
)
1709 int priority
= op
.priority
;
1710 ceph_tid_t tid
= op
.tid
;
1712 dout(10) << __func__
<< ": starting read " << op
<< dendl
;
1714 map
<pg_shard_t
, ECSubRead
> messages
;
1715 for (map
<hobject_t
, read_request_t
>::iterator i
= op
.to_read
.begin();
1716 i
!= op
.to_read
.end();
1718 bool need_attrs
= i
->second
.want_attrs
;
1720 for (auto j
= i
->second
.need
.begin();
1721 j
!= i
->second
.need
.end();
1724 messages
[j
->first
].attrs_to_read
.insert(i
->first
);
1727 messages
[j
->first
].subchunks
[i
->first
] = j
->second
;
1728 op
.obj_to_source
[i
->first
].insert(j
->first
);
1729 op
.source_to_obj
[j
->first
].insert(i
->first
);
1731 for (list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >::const_iterator j
=
1732 i
->second
.to_read
.begin();
1733 j
!= i
->second
.to_read
.end();
1735 pair
<uint64_t, uint64_t> chunk_off_len
=
1736 sinfo
.aligned_offset_len_to_chunk(make_pair(j
->get
<0>(), j
->get
<1>()));
1737 for (auto k
= i
->second
.need
.begin();
1738 k
!= i
->second
.need
.end();
1740 messages
[k
->first
].to_read
[i
->first
].push_back(
1742 chunk_off_len
.first
,
1743 chunk_off_len
.second
,
1746 ceph_assert(!need_attrs
);
1750 std::vector
<std::pair
<int, Message
*>> m
;
1751 m
.reserve(messages
.size());
1752 for (map
<pg_shard_t
, ECSubRead
>::iterator i
= messages
.begin();
1753 i
!= messages
.end();
1755 op
.in_progress
.insert(i
->first
);
1756 shard_to_read_map
[i
->first
].insert(op
.tid
);
1757 i
->second
.tid
= tid
;
1758 MOSDECSubOpRead
*msg
= new MOSDECSubOpRead
;
1759 msg
->set_priority(priority
);
1761 get_parent()->whoami_spg_t().pgid
,
1763 msg
->map_epoch
= get_osdmap_epoch();
1764 msg
->min_epoch
= get_parent()->get_interval_start_epoch();
1765 msg
->op
= i
->second
;
1766 msg
->op
.from
= get_parent()->whoami_shard();
1769 // initialize a child span for this shard
1770 msg
->trace
.init("ec sub read", nullptr, &op
.trace
);
1771 msg
->trace
.keyval("shard", i
->first
.shard
.id
);
1773 m
.push_back(std::make_pair(i
->first
.osd
, msg
));
1776 get_parent()->send_message_osd_cluster(m
, get_osdmap_epoch());
1779 dout(10) << __func__
<< ": started " << op
<< dendl
;
1782 ECUtil::HashInfoRef
ECBackend::get_hash_info(
1783 const hobject_t
&hoid
, bool checks
, const map
<string
,bufferptr
> *attrs
)
1785 dout(10) << __func__
<< ": Getting attr on " << hoid
<< dendl
;
1786 ECUtil::HashInfoRef ref
= unstable_hashinfo_registry
.lookup(hoid
);
1788 dout(10) << __func__
<< ": not in cache " << hoid
<< dendl
;
1790 int r
= store
->stat(
1792 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
1794 ECUtil::HashInfo
hinfo(ec_impl
->get_chunk_count());
1795 // XXX: What does it mean if there is no object on disk?
1797 dout(10) << __func__
<< ": found on disk, size " << st
.st_size
<< dendl
;
1800 map
<string
, bufferptr
>::const_iterator k
= attrs
->find(ECUtil::get_hinfo_key());
1801 if (k
== attrs
->end()) {
1802 dout(5) << __func__
<< " " << hoid
<< " missing hinfo attr" << dendl
;
1804 bl
.push_back(k
->second
);
1809 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
1810 ECUtil::get_hinfo_key(),
1813 dout(5) << __func__
<< ": getattr failed: " << cpp_strerror(r
) << dendl
;
1814 bl
.clear(); // just in case
1817 if (bl
.length() > 0) {
1818 auto bp
= bl
.cbegin();
1822 dout(0) << __func__
<< ": Can't decode hinfo for " << hoid
<< dendl
;
1823 return ECUtil::HashInfoRef();
1825 if (checks
&& hinfo
.get_total_chunk_size() != (uint64_t)st
.st_size
) {
1826 dout(0) << __func__
<< ": Mismatch of total_chunk_size "
1827 << hinfo
.get_total_chunk_size() << dendl
;
1828 return ECUtil::HashInfoRef();
1830 } else if (st
.st_size
> 0) { // If empty object and no hinfo, create it
1831 return ECUtil::HashInfoRef();
1834 ref
= unstable_hashinfo_registry
.lookup_or_create(hoid
, hinfo
);
1839 void ECBackend::start_rmw(Op
*op
, PGTransactionUPtr
&&t
)
1843 op
->plan
= ECTransaction::get_write_plan(
1846 [&](const hobject_t
&i
) {
1847 ECUtil::HashInfoRef ref
= get_hash_info(i
, false);
1849 derr
<< __func__
<< ": get_hash_info(" << i
<< ")"
1850 << " returned a null pointer and there is no "
1851 << " way to recover from such an error in this "
1852 << " context" << dendl
;
1857 get_parent()->get_dpp());
1859 dout(10) << __func__
<< ": " << *op
<< dendl
;
1861 waiting_state
.push_back(*op
);
1865 bool ECBackend::try_state_to_reads()
1867 if (waiting_state
.empty())
1870 Op
*op
= &(waiting_state
.front());
1871 if (op
->requires_rmw() && pipeline_state
.cache_invalid()) {
1872 ceph_assert(get_parent()->get_pool().allows_ecoverwrites());
1873 dout(20) << __func__
<< ": blocking " << *op
1874 << " because it requires an rmw and the cache is invalid "
1880 if (!pipeline_state
.caching_enabled()) {
1881 op
->using_cache
= false;
1882 } else if (op
->invalidates_cache()) {
1883 dout(20) << __func__
<< ": invalidating cache after this op"
1885 pipeline_state
.invalidate();
1888 waiting_state
.pop_front();
1889 waiting_reads
.push_back(*op
);
1891 if (op
->using_cache
) {
1892 cache
.open_write_pin(op
->pin
);
1895 for (auto &&hpair
: op
->plan
.will_write
) {
1896 auto to_read_plan_iter
= op
->plan
.to_read
.find(hpair
.first
);
1897 const extent_set
&to_read_plan
=
1898 to_read_plan_iter
== op
->plan
.to_read
.end() ?
1900 to_read_plan_iter
->second
;
1902 extent_set remote_read
= cache
.reserve_extents_for_rmw(
1908 extent_set pending_read
= to_read_plan
;
1909 pending_read
.subtract(remote_read
);
1911 if (!remote_read
.empty()) {
1912 op
->remote_read
[hpair
.first
] = std::move(remote_read
);
1914 if (!pending_read
.empty()) {
1915 op
->pending_read
[hpair
.first
] = std::move(pending_read
);
1919 op
->remote_read
= op
->plan
.to_read
;
1922 dout(10) << __func__
<< ": " << *op
<< dendl
;
1924 if (!op
->remote_read
.empty()) {
1925 ceph_assert(get_parent()->get_pool().allows_ecoverwrites());
1926 objects_read_async_no_cache(
1928 [this, op
](map
<hobject_t
,pair
<int, extent_map
> > &&results
) {
1929 for (auto &&i
: results
) {
1930 op
->remote_read_result
.emplace(i
.first
, i
.second
.second
);
1939 bool ECBackend::try_reads_to_commit()
1941 if (waiting_reads
.empty())
1943 Op
*op
= &(waiting_reads
.front());
1944 if (op
->read_in_progress())
1946 waiting_reads
.pop_front();
1947 waiting_commit
.push_back(*op
);
1949 dout(10) << __func__
<< ": starting commit on " << *op
<< dendl
;
1950 dout(20) << __func__
<< ": " << cache
<< dendl
;
1952 get_parent()->apply_stats(
1956 if (op
->using_cache
) {
1957 for (auto &&hpair
: op
->pending_read
) {
1958 op
->remote_read_result
[hpair
.first
].insert(
1959 cache
.get_remaining_extents_for_rmw(
1964 op
->pending_read
.clear();
1966 ceph_assert(op
->pending_read
.empty());
1969 map
<shard_id_t
, ObjectStore::Transaction
> trans
;
1970 for (set
<pg_shard_t
>::const_iterator i
=
1971 get_parent()->get_acting_recovery_backfill_shards().begin();
1972 i
!= get_parent()->get_acting_recovery_backfill_shards().end();
1977 op
->trace
.event("start ec write");
1979 map
<hobject_t
,extent_map
> written
;
1981 ECTransaction::generate_transactions(
1984 get_parent()->get_info().pgid
.pgid
,
1986 op
->remote_read_result
,
1991 &(op
->temp_cleared
),
1992 get_parent()->get_dpp(),
1993 get_osdmap()->require_osd_release
);
1996 dout(20) << __func__
<< ": " << cache
<< dendl
;
1997 dout(20) << __func__
<< ": written: " << written
<< dendl
;
1998 dout(20) << __func__
<< ": op: " << *op
<< dendl
;
2000 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2001 for (auto &&i
: op
->log_entries
) {
2002 if (i
.requires_kraken()) {
2003 derr
<< __func__
<< ": log entry " << i
<< " requires kraken"
2004 << " but overwrites are not enabled!" << dendl
;
2010 map
<hobject_t
,extent_set
> written_set
;
2011 for (auto &&i
: written
) {
2012 written_set
[i
.first
] = i
.second
.get_interval_set();
2014 dout(20) << __func__
<< ": written_set: " << written_set
<< dendl
;
2015 ceph_assert(written_set
== op
->plan
.will_write
);
2017 if (op
->using_cache
) {
2018 for (auto &&hpair
: written
) {
2019 dout(20) << __func__
<< ": " << hpair
<< dendl
;
2020 cache
.present_rmw_update(hpair
.first
, op
->pin
, hpair
.second
);
2023 op
->remote_read
.clear();
2024 op
->remote_read_result
.clear();
2026 ObjectStore::Transaction empty
;
2027 bool should_write_local
= false;
2028 ECSubWrite local_write_op
;
2029 std::vector
<std::pair
<int, Message
*>> messages
;
2030 messages
.reserve(get_parent()->get_acting_recovery_backfill_shards().size());
2031 set
<pg_shard_t
> backfill_shards
= get_parent()->get_backfill_shards();
2032 for (set
<pg_shard_t
>::const_iterator i
=
2033 get_parent()->get_acting_recovery_backfill_shards().begin();
2034 i
!= get_parent()->get_acting_recovery_backfill_shards().end();
2036 op
->pending_apply
.insert(*i
);
2037 op
->pending_commit
.insert(*i
);
2038 map
<shard_id_t
, ObjectStore::Transaction
>::iterator iter
=
2039 trans
.find(i
->shard
);
2040 ceph_assert(iter
!= trans
.end());
2041 bool should_send
= get_parent()->should_send_op(*i
, op
->hoid
);
2042 const pg_stat_t
&stats
=
2043 (should_send
|| !backfill_shards
.count(*i
)) ?
2045 parent
->get_shard_info().find(*i
)->second
.stats
;
2048 get_parent()->whoami_shard(),
2053 should_send
? iter
->second
: empty
,
2056 op
->roll_forward_to
,
2058 op
->updated_hit_set_history
,
2063 ZTracer::Trace trace
;
2065 // initialize a child span for this shard
2066 trace
.init("ec sub write", nullptr, &op
->trace
);
2067 trace
.keyval("shard", i
->shard
.id
);
2070 if (*i
== get_parent()->whoami_shard()) {
2071 should_write_local
= true;
2072 local_write_op
.claim(sop
);
2074 MOSDECSubOpWrite
*r
= new MOSDECSubOpWrite(sop
);
2075 r
->pgid
= spg_t(get_parent()->primary_spg_t().pgid
, i
->shard
);
2076 r
->map_epoch
= get_osdmap_epoch();
2077 r
->min_epoch
= get_parent()->get_interval_start_epoch();
2079 messages
.push_back(std::make_pair(i
->osd
, r
));
2082 if (!messages
.empty()) {
2083 get_parent()->send_message_osd_cluster(messages
, get_osdmap_epoch());
2086 if (should_write_local
) {
2088 get_parent()->whoami_shard(),
2094 for (auto i
= op
->on_write
.begin();
2095 i
!= op
->on_write
.end();
2096 op
->on_write
.erase(i
++)) {
2103 bool ECBackend::try_finish_rmw()
2105 if (waiting_commit
.empty())
2107 Op
*op
= &(waiting_commit
.front());
2108 if (op
->write_in_progress())
2110 waiting_commit
.pop_front();
2112 dout(10) << __func__
<< ": " << *op
<< dendl
;
2113 dout(20) << __func__
<< ": " << cache
<< dendl
;
2115 if (op
->roll_forward_to
> completed_to
)
2116 completed_to
= op
->roll_forward_to
;
2117 if (op
->version
> committed_to
)
2118 committed_to
= op
->version
;
2120 if (get_osdmap()->require_osd_release
>= ceph_release_t::kraken
) {
2121 if (op
->version
> get_parent()->get_log().get_can_rollback_to() &&
2122 waiting_reads
.empty() &&
2123 waiting_commit
.empty()) {
2124 // submit a dummy transaction to kick the rollforward
2125 auto tid
= get_parent()->get_tid();
2126 Op
*nop
= &(tid_to_op_map
[tid
]);
2127 nop
->hoid
= op
->hoid
;
2128 nop
->trim_to
= op
->trim_to
;
2129 nop
->roll_forward_to
= op
->version
;
2131 nop
->reqid
= op
->reqid
;
2132 waiting_reads
.push_back(*nop
);
2136 if (op
->using_cache
) {
2137 cache
.release_write_pin(op
->pin
);
2139 tid_to_op_map
.erase(op
->tid
);
2141 if (waiting_reads
.empty() &&
2142 waiting_commit
.empty()) {
2143 pipeline_state
.clear();
2144 dout(20) << __func__
<< ": clearing pipeline_state "
2151 void ECBackend::check_ops()
2153 while (try_state_to_reads() ||
2154 try_reads_to_commit() ||
2158 int ECBackend::objects_read_sync(
2159 const hobject_t
&hoid
,
2168 void ECBackend::objects_read_async(
2169 const hobject_t
&hoid
,
2170 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2171 pair
<bufferlist
*, Context
*> > > &to_read
,
2172 Context
*on_complete
,
2175 map
<hobject_t
,std::list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > >
2180 for (list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2181 pair
<bufferlist
*, Context
*> > >::const_iterator i
=
2185 pair
<uint64_t, uint64_t> tmp
=
2186 sinfo
.offset_len_to_stripe_bounds(
2187 make_pair(i
->first
.get
<0>(), i
->first
.get
<1>()));
2189 es
.union_insert(tmp
.first
, tmp
.second
);
2190 flags
|= i
->first
.get
<2>();
2194 auto &offsets
= reads
[hoid
];
2195 for (auto j
= es
.begin();
2209 list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2210 pair
<bufferlist
*, Context
*> > > to_read
;
2211 unique_ptr
<Context
> on_complete
;
2212 cb(const cb
&) = delete;
2213 cb(cb
&&) = default;
2215 const hobject_t
&hoid
,
2216 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
2217 pair
<bufferlist
*, Context
*> > > &to_read
,
2218 Context
*on_complete
)
2222 on_complete(on_complete
) {}
2223 void operator()(map
<hobject_t
,pair
<int, extent_map
> > &&results
) {
2224 auto dpp
= ec
->get_parent()->get_dpp();
2225 ldpp_dout(dpp
, 20) << "objects_read_async_cb: got: " << results
2227 ldpp_dout(dpp
, 20) << "objects_read_async_cb: cache: " << ec
->cache
2230 auto &got
= results
[hoid
];
2233 for (auto &&read
: to_read
) {
2234 if (got
.first
< 0) {
2235 if (read
.second
.second
) {
2236 read
.second
.second
->complete(got
.first
);
2241 ceph_assert(read
.second
.first
);
2242 uint64_t offset
= read
.first
.get
<0>();
2243 uint64_t length
= read
.first
.get
<1>();
2244 auto range
= got
.second
.get_containing_range(offset
, length
);
2245 ceph_assert(range
.first
!= range
.second
);
2246 ceph_assert(range
.first
.get_off() <= offset
);
2247 ldpp_dout(dpp
, 30) << "offset: " << offset
<< dendl
;
2248 ldpp_dout(dpp
, 30) << "range offset: " << range
.first
.get_off() << dendl
;
2249 ldpp_dout(dpp
, 30) << "length: " << length
<< dendl
;
2250 ldpp_dout(dpp
, 30) << "range length: " << range
.first
.get_len() << dendl
;
2252 (offset
+ length
) <=
2253 (range
.first
.get_off() + range
.first
.get_len()));
2254 read
.second
.first
->substr_of(
2255 range
.first
.get_val(),
2256 offset
- range
.first
.get_off(),
2258 if (read
.second
.second
) {
2259 read
.second
.second
->complete(length
);
2260 read
.second
.second
= nullptr;
2266 on_complete
.release()->complete(r
);
2270 for (auto &&i
: to_read
) {
2271 delete i
.second
.second
;
2276 objects_read_and_reconstruct(
2279 make_gen_lambda_context
<
2280 map
<hobject_t
,pair
<int, extent_map
> > &&, cb
>(
2287 struct CallClientContexts
:
2288 public GenContext
<pair
<RecoveryMessages
*, ECBackend::read_result_t
& > &> {
2291 ECBackend::ClientAsyncReadStatus
*status
;
2292 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > to_read
;
2296 ECBackend::ClientAsyncReadStatus
*status
,
2297 const list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > &to_read
)
2298 : hoid(hoid
), ec(ec
), status(status
), to_read(to_read
) {}
2299 void finish(pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
) override
{
2300 ECBackend::read_result_t
&res
= in
.second
;
2304 ceph_assert(res
.returned
.size() == to_read
.size());
2305 ceph_assert(res
.errors
.empty());
2306 for (auto &&read
: to_read
) {
2307 pair
<uint64_t, uint64_t> adjusted
=
2308 ec
->sinfo
.offset_len_to_stripe_bounds(
2309 make_pair(read
.get
<0>(), read
.get
<1>()));
2310 ceph_assert(res
.returned
.front().get
<0>() == adjusted
.first
&&
2311 res
.returned
.front().get
<1>() == adjusted
.second
);
2312 map
<int, bufferlist
> to_decode
;
2314 for (map
<pg_shard_t
, bufferlist
>::iterator j
=
2315 res
.returned
.front().get
<2>().begin();
2316 j
!= res
.returned
.front().get
<2>().end();
2318 to_decode
[j
->first
.shard
].claim(j
->second
);
2320 int r
= ECUtil::decode(
2332 read
.get
<0>() - adjusted
.first
,
2333 std::min(read
.get
<1>(),
2334 bl
.length() - (read
.get
<0>() - adjusted
.first
)));
2336 read
.get
<0>(), trimmed
.length(), std::move(trimmed
));
2337 res
.returned
.pop_front();
2340 status
->complete_object(hoid
, res
.r
, std::move(result
));
2345 void ECBackend::objects_read_and_reconstruct(
2346 const map
<hobject_t
,
2347 std::list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >
2350 GenContextURef
<map
<hobject_t
,pair
<int, extent_map
> > &&> &&func
)
2352 in_progress_client_reads
.emplace_back(
2353 reads
.size(), std::move(func
));
2354 if (!reads
.size()) {
2359 map
<hobject_t
, set
<int>> obj_want_to_read
;
2360 set
<int> want_to_read
;
2361 get_want_to_read_shards(&want_to_read
);
2363 map
<hobject_t
, read_request_t
> for_read_op
;
2364 for (auto &&to_read
: reads
) {
2365 map
<pg_shard_t
, vector
<pair
<int, int>>> shards
;
2366 int r
= get_min_avail_to_read_shards(
2372 ceph_assert(r
== 0);
2374 CallClientContexts
*c
= new CallClientContexts(
2377 &(in_progress_client_reads
.back()),
2387 obj_want_to_read
.insert(make_pair(to_read
.first
, want_to_read
));
2391 CEPH_MSG_PRIO_DEFAULT
,
2400 int ECBackend::send_all_remaining_reads(
2401 const hobject_t
&hoid
,
2404 set
<int> already_read
;
2405 const set
<pg_shard_t
>& ots
= rop
.obj_to_source
[hoid
];
2406 for (set
<pg_shard_t
>::iterator i
= ots
.begin(); i
!= ots
.end(); ++i
)
2407 already_read
.insert(i
->shard
);
2408 dout(10) << __func__
<< " have/error shards=" << already_read
<< dendl
;
2409 map
<pg_shard_t
, vector
<pair
<int, int>>> shards
;
2410 int r
= get_remaining_shards(hoid
, already_read
, rop
.want_to_read
[hoid
],
2411 rop
.complete
[hoid
], &shards
, rop
.for_recovery
);
2415 list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > offsets
=
2416 rop
.to_read
.find(hoid
)->second
.to_read
;
2417 GenContext
<pair
<RecoveryMessages
*, read_result_t
& > &> *c
=
2418 rop
.to_read
.find(hoid
)->second
.cb
;
2420 // (Note cuixf) If we need to read attrs and we read failed, try to read again.
2422 rop
.to_read
.find(hoid
)->second
.want_attrs
&&
2423 (!rop
.complete
[hoid
].attrs
|| rop
.complete
[hoid
].attrs
->empty());
2425 dout(10) << __func__
<< " want attrs again" << dendl
;
2428 rop
.to_read
.erase(hoid
);
2429 rop
.to_read
.insert(make_pair(
2440 int ECBackend::objects_get_attrs(
2441 const hobject_t
&hoid
,
2442 map
<string
, bufferlist
> *out
)
2444 int r
= store
->getattrs(
2446 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
2451 for (map
<string
, bufferlist
>::iterator i
= out
->begin();
2454 if (ECUtil::is_hinfo_key_string(i
->first
))
2462 void ECBackend::rollback_append(
2463 const hobject_t
&hoid
,
2465 ObjectStore::Transaction
*t
)
2467 ceph_assert(old_size
% sinfo
.get_stripe_width() == 0);
2470 ghobject_t(hoid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
2471 sinfo
.aligned_logical_offset_to_chunk_offset(
2475 int ECBackend::be_deep_scrub(
2476 const hobject_t
&poid
,
2478 ScrubMapBuilder
&pos
,
2479 ScrubMap::object
&o
)
2481 dout(10) << __func__
<< " " << poid
<< " pos " << pos
<< dendl
;
2484 uint32_t fadvise_flags
= CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
|
2485 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
;
2488 sleeptime
.set_from_double(cct
->_conf
->osd_debug_deep_scrub_sleep
);
2489 if (sleeptime
!= utime_t()) {
2490 lgeneric_derr(cct
) << __func__
<< " sleeping for " << sleeptime
<< dendl
;
2494 if (pos
.data_pos
== 0) {
2495 pos
.data_hash
= bufferhash(-1);
2498 uint64_t stride
= cct
->_conf
->osd_deep_scrub_stride
;
2499 if (stride
% sinfo
.get_chunk_size())
2500 stride
+= sinfo
.get_chunk_size() - (stride
% sinfo
.get_chunk_size());
2506 poid
, ghobject_t::NO_GEN
, get_parent()->whoami_shard().shard
),
2511 dout(20) << __func__
<< " " << poid
<< " got "
2512 << r
<< " on read, read_error" << dendl
;
2513 o
.read_error
= true;
2516 if (bl
.length() % sinfo
.get_chunk_size()) {
2517 dout(20) << __func__
<< " " << poid
<< " got "
2518 << r
<< " on read, not chunk size " << sinfo
.get_chunk_size() << " aligned"
2520 o
.read_error
= true;
2524 pos
.data_hash
<< bl
;
2527 if (r
== (int)stride
) {
2528 return -EINPROGRESS
;
2531 ECUtil::HashInfoRef hinfo
= get_hash_info(poid
, false, &o
.attrs
);
2533 dout(0) << "_scan_list " << poid
<< " could not retrieve hash info" << dendl
;
2534 o
.read_error
= true;
2535 o
.digest_present
= false;
2538 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2539 ceph_assert(hinfo
->has_chunk_hash());
2540 if (hinfo
->get_total_chunk_size() != (unsigned)pos
.data_pos
) {
2541 dout(0) << "_scan_list " << poid
<< " got incorrect size on read 0x"
2543 << " expected 0x" << hinfo
->get_total_chunk_size() << std::dec
2545 o
.ec_size_mismatch
= true;
2549 if (hinfo
->get_chunk_hash(get_parent()->whoami_shard().shard
) !=
2550 pos
.data_hash
.digest()) {
2551 dout(0) << "_scan_list " << poid
<< " got incorrect hash on read 0x"
2552 << std::hex
<< pos
.data_hash
.digest() << " != expected 0x"
2553 << hinfo
->get_chunk_hash(get_parent()->whoami_shard().shard
)
2554 << std::dec
<< dendl
;
2555 o
.ec_hash_mismatch
= true;
2559 /* We checked above that we match our own stored hash. We cannot
2560 * send a hash of the actual object, so instead we simply send
2561 * our locally stored hash of shard 0 on the assumption that if
2562 * we match our chunk hash and our recollection of the hash for
2563 * chunk 0 matches that of our peers, there is likely no corruption.
2565 o
.digest
= hinfo
->get_chunk_hash(0);
2566 o
.digest_present
= true;
2568 /* Hack! We must be using partial overwrites, and partial overwrites
2569 * don't support deep-scrub yet
2572 o
.digest_present
= true;
2577 o
.omap_digest_present
= true;