1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include <boost/intrusive/set.hpp>
19 #include <boost/intrusive/list.hpp>
22 #include "PGBackend.h"
23 #include "erasure-code/ErasureCodeInterface.h"
25 #include "ECTransaction.h"
26 #include "ExtentCache.h"
30 struct ECSubWriteReply
;
32 struct ECSubReadReply
;
34 struct RecoveryMessages
;
35 class ECBackend
: public PGBackend
{
37 RecoveryHandle
*open_recovery_op() override
;
45 const hobject_t
&hoid
,
47 ObjectContextRef head
,
55 bool can_handle_while_inactive(
58 friend struct SubWriteApplied
;
59 friend struct SubWriteCommitted
;
60 void sub_write_committed(
63 eversion_t last_complete
,
64 const ZTracer::Trace
&trace
);
65 void handle_sub_write(
69 const ZTracer::Trace
&trace
74 ECSubReadReply
*reply
,
75 const ZTracer::Trace
&trace
77 void handle_sub_write_reply(
79 const ECSubWriteReply
&op
,
80 const ZTracer::Trace
&trace
82 void handle_sub_read_reply(
86 const ZTracer::Trace
&trace
90 void check_recovery_sources(const OSDMapRef
& osdmap
) override
;
92 void on_change() override
;
93 void clear_recovery_state() override
;
95 void dump_recovery_info(Formatter
*f
) const override
;
97 void call_write_ordered(std::function
<void(void)> &&cb
) override
;
99 void submit_transaction(
100 const hobject_t
&hoid
,
101 const object_stat_sum_t
&delta_stats
,
102 const eversion_t
&at_version
,
103 PGTransactionUPtr
&&t
,
104 const eversion_t
&trim_to
,
105 const eversion_t
&roll_forward_to
,
106 const vector
<pg_log_entry_t
> &log_entries
,
107 boost::optional
<pg_hit_set_history_t
> &hset_history
,
108 Context
*on_all_commit
,
114 int objects_read_sync(
115 const hobject_t
&hoid
,
119 bufferlist
*bl
) override
;
122 * Async read mechanism
124 * Async reads use the same async read mechanism as does recovery.
125 * CallClientContexts is responsible for reconstructing the response
126 * buffer as well as for calling the callbacks.
128 * One tricky bit is that two reads may possibly not read from the same
129 * set of replicas. This could result in two reads completing in the
130 * wrong (from the interface user's point of view) order. Thus, we
131 * maintain a queue of in progress reads (@see in_progress_client_reads)
132 * to ensure that we always call the completion callback in order.
134 * Another subtly is that while we may read a degraded object, we will
135 * still only perform a client read from shards in the acting set. This
136 * ensures that we won't ever have to restart a client initiated read in
137 * check_recovery_sources.
139 void objects_read_and_reconstruct(
140 const map
<hobject_t
, std::list
<boost::tuple
<uint64_t, uint64_t, uint32_t> >
143 GenContextURef
<map
<hobject_t
,pair
<int, extent_map
> > &&> &&func
);
145 friend struct CallClientContexts
;
146 struct ClientAsyncReadStatus
{
147 unsigned objects_to_read
;
148 GenContextURef
<map
<hobject_t
,pair
<int, extent_map
> > &&> func
;
149 map
<hobject_t
,pair
<int, extent_map
> > results
;
150 explicit ClientAsyncReadStatus(
151 unsigned objects_to_read
,
152 GenContextURef
<map
<hobject_t
,pair
<int, extent_map
> > &&> &&func
)
153 : objects_to_read(objects_to_read
), func(std::move(func
)) {}
154 void complete_object(
155 const hobject_t
&hoid
,
157 extent_map
&&buffers
) {
158 ceph_assert(objects_to_read
);
160 ceph_assert(!results
.count(hoid
));
161 results
.emplace(hoid
, make_pair(err
, std::move(buffers
)));
163 bool is_complete() const {
164 return objects_to_read
== 0;
167 func
.release()->complete(std::move(results
));
170 list
<ClientAsyncReadStatus
> in_progress_client_reads
;
171 void objects_read_async(
172 const hobject_t
&hoid
,
173 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
174 pair
<bufferlist
*, Context
*> > > &to_read
,
175 Context
*on_complete
,
176 bool fast_read
= false) override
;
178 template <typename Func
>
179 void objects_read_async_no_cache(
180 const map
<hobject_t
,extent_set
> &to_read
,
181 Func
&&on_complete
) {
182 map
<hobject_t
,std::list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > > _to_read
;
183 for (auto &&hpair
: to_read
) {
184 auto &l
= _to_read
[hpair
.first
];
185 for (auto extent
: hpair
.second
) {
186 l
.emplace_back(extent
.first
, extent
.second
, 0);
189 objects_read_and_reconstruct(
192 make_gen_lambda_context
<
193 map
<hobject_t
,pair
<int, extent_map
> > &&, Func
>(
194 std::forward
<Func
>(on_complete
)));
197 while (in_progress_client_reads
.size() &&
198 in_progress_client_reads
.front().is_complete()) {
199 in_progress_client_reads
.front().run();
200 in_progress_client_reads
.pop_front();
205 friend struct ECRecoveryHandle
;
206 uint64_t get_recovery_chunk_size() const {
207 return round_up_to(cct
->_conf
->osd_recovery_max_chunk
,
208 sinfo
.get_stripe_width());
211 void get_want_to_read_shards(set
<int> *want_to_read
) const {
212 const vector
<int> &chunk_mapping
= ec_impl
->get_chunk_mapping();
213 for (int i
= 0; i
< (int)ec_impl
->get_data_chunk_count(); ++i
) {
214 int chunk
= (int)chunk_mapping
.size() > i
? chunk_mapping
[i
] : i
;
215 want_to_read
->insert(chunk
);
222 * Recovery uses the same underlying read mechanism as client reads
223 * with the slight difference that recovery reads may come from non
224 * acting shards. Thus, check_recovery_sources may wind up calling
225 * cancel_pull for a read originating with RecoveryOp.
227 * The recovery process is expressed as a state machine:
228 * - IDLE: Nothing is currently in progress, reads will be started and
229 * we will transition to READING
230 * - READING: We are awaiting a pending read op. Once complete, we will
231 * decode the buffers and proceed to WRITING
232 * - WRITING: We are awaiting a completed push. Once complete, we will
233 * either transition to COMPLETE or to IDLE to continue.
234 * - COMPLETE: complete
236 * We use the existing Push and PushReply messages and structures to
237 * handle actually shuffling the data over to the replicas. recovery_info
238 * and recovery_progress are expressed in terms of the logical offset
239 * space except for data_included which is in terms of the chunked object
240 * space (to match the passed buffer).
242 * xattrs are requested on the first read and used to initialize the
243 * object_context if missing on completion of the first read.
245 * In order to batch up reads and writes, we batch Push, PushReply,
246 * Transaction, and reads in a RecoveryMessages object which is passed
247 * among the recovery methods.
252 set
<pg_shard_t
> missing_on
;
253 set
<shard_id_t
> missing_on_shards
;
255 ObjectRecoveryInfo recovery_info
;
256 ObjectRecoveryProgress recovery_progress
;
258 enum state_t
{ IDLE
, READING
, WRITING
, COMPLETE
} state
;
260 static const char* tostr(state_t state
) {
262 case ECBackend::RecoveryOp::IDLE
:
265 case ECBackend::RecoveryOp::READING
:
268 case ECBackend::RecoveryOp::WRITING
:
271 case ECBackend::RecoveryOp::COMPLETE
:
280 // must be filled if state == WRITING
281 map
<int, bufferlist
> returned_data
;
282 map
<string
, bufferlist
> xattrs
;
283 ECUtil::HashInfoRef hinfo
;
284 ObjectContextRef obc
;
285 set
<pg_shard_t
> waiting_on_pushes
;
287 // valid in state READING
288 pair
<uint64_t, uint64_t> extent_requested
;
290 void dump(Formatter
*f
) const;
292 RecoveryOp() : state(IDLE
) {}
294 friend ostream
&operator<<(ostream
&lhs
, const RecoveryOp
&rhs
);
295 map
<hobject_t
, RecoveryOp
> recovery_ops
;
297 void continue_recovery_op(
299 RecoveryMessages
*m
);
300 void dispatch_recovery_messages(RecoveryMessages
&m
, int priority
);
301 friend struct OnRecoveryReadComplete
;
302 void handle_recovery_read_complete(
303 const hobject_t
&hoid
,
304 boost::tuple
<uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > &to_read
,
305 boost::optional
<map
<string
, bufferlist
> > attrs
,
306 RecoveryMessages
*m
);
307 void handle_recovery_push(
311 void handle_recovery_push_reply(
312 const PushReplyOp
&op
,
314 RecoveryMessages
*m
);
315 void get_all_avail_shards(
316 const hobject_t
&hoid
,
317 const set
<pg_shard_t
> &error_shards
,
319 map
<shard_id_t
, pg_shard_t
> &shards
,
324 * Low level async read mechanism
326 * To avoid duplicating the logic for requesting and waiting for
327 * multiple object shards, there is a common async read mechanism
328 * taking a map of hobject_t->read_request_t which defines callbacks
329 * taking read_result_ts as arguments.
331 * tid_to_read_map gives open read ops. check_recovery_sources uses
332 * shard_to_read_map and ReadOp::source_to_obj to restart reads
333 * involving down osds.
335 * The user is responsible for specifying replicas on which to read
336 * and for reassembling the buffer on the other side since client
337 * reads require the original object buffer while recovery only needs
338 * the missing pieces.
340 * Rather than handling reads on the primary directly, we simply send
341 * ourselves a message. This avoids a dedicated primary path for that
344 struct read_result_t
{
346 map
<pg_shard_t
, int> errors
;
347 boost::optional
<map
<string
, bufferlist
> > attrs
;
350 uint64_t, uint64_t, map
<pg_shard_t
, bufferlist
> > > returned
;
351 read_result_t() : r(0) {}
353 struct read_request_t
{
354 const list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > to_read
;
355 const map
<pg_shard_t
, vector
<pair
<int, int>>> need
;
356 const bool want_attrs
;
357 GenContext
<pair
<RecoveryMessages
*, read_result_t
& > &> *cb
;
359 const list
<boost::tuple
<uint64_t, uint64_t, uint32_t> > &to_read
,
360 const map
<pg_shard_t
, vector
<pair
<int, int>>> &need
,
362 GenContext
<pair
<RecoveryMessages
*, read_result_t
& > &> *cb
)
363 : to_read(to_read
), need(need
), want_attrs(want_attrs
),
366 friend ostream
&operator<<(ostream
&lhs
, const read_request_t
&rhs
);
371 OpRequestRef op
; // may be null if not on behalf of a client
372 // True if redundant reads are issued, false otherwise,
373 // this is useful to tradeoff some resources (redundant ops) for
374 // low latency read, especially on relatively idle cluster
375 bool do_redundant_reads
;
376 // True if reading for recovery which could possibly reading only a subset
377 // of the available shards.
380 ZTracer::Trace trace
;
382 map
<hobject_t
, set
<int>> want_to_read
;
383 map
<hobject_t
, read_request_t
> to_read
;
384 map
<hobject_t
, read_result_t
> complete
;
386 map
<hobject_t
, set
<pg_shard_t
>> obj_to_source
;
387 map
<pg_shard_t
, set
<hobject_t
> > source_to_obj
;
389 void dump(Formatter
*f
) const;
391 set
<pg_shard_t
> in_progress
;
396 bool do_redundant_reads
,
399 map
<hobject_t
, set
<int>> &&_want_to_read
,
400 map
<hobject_t
, read_request_t
> &&_to_read
)
401 : priority(priority
), tid(tid
), op(op
), do_redundant_reads(do_redundant_reads
),
402 for_recovery(for_recovery
), want_to_read(std::move(_want_to_read
)),
403 to_read(std::move(_to_read
)) {
404 for (auto &&hpair
: to_read
) {
405 auto &returned
= complete
[hpair
.first
].returned
;
406 for (auto &&extent
: hpair
.second
.to_read
) {
411 map
<pg_shard_t
, bufferlist
>()));
416 ReadOp(const ReadOp
&) = default;
417 ReadOp(ReadOp
&&) = default;
419 friend struct FinishReadOp
;
421 const OSDMapRef
& osdmap
,
423 void complete_read_op(ReadOp
&rop
, RecoveryMessages
*m
);
424 friend ostream
&operator<<(ostream
&lhs
, const ReadOp
&rhs
);
425 map
<ceph_tid_t
, ReadOp
> tid_to_read_map
;
426 map
<pg_shard_t
, set
<ceph_tid_t
> > shard_to_read_map
;
429 map
<hobject_t
, set
<int>> &want_to_read
,
430 map
<hobject_t
, read_request_t
> &to_read
,
432 bool do_redundant_reads
, bool for_recovery
);
434 void do_read_op(ReadOp
&rop
);
435 int send_all_remaining_reads(
436 const hobject_t
&hoid
,
443 * ECTransaction is responsible for generating a transaction for
444 * each shard to which we need to send the write. As required
445 * by the PGBackend interface, the ECBackend write mechanism
446 * passes trim information with the write and last_complete back
449 * As with client reads, there is a possibility of out-of-order
450 * completions. Thus, callbacks and completion are called in order
451 * on the writing list.
453 struct Op
: boost::intrusive::list_base_hook
<> {
454 /// From submit_transaction caller, describes operation
456 object_stat_sum_t delta_stats
;
459 boost::optional
<pg_hit_set_history_t
> updated_hit_set_history
;
460 vector
<pg_log_entry_t
> log_entries
;
463 ZTracer::Trace trace
;
465 eversion_t roll_forward_to
; /// Soon to be generated internally
467 /// Ancillary also provided from submit_transaction caller
468 map
<hobject_t
, ObjectContextRef
> obc_map
;
470 /// see call_write_ordered
471 std::list
<std::function
<void(void)> > on_write
;
473 /// Generated internally
474 set
<hobject_t
> temp_added
;
475 set
<hobject_t
> temp_cleared
;
477 ECTransaction::WritePlan plan
;
478 bool requires_rmw() const { return !plan
.to_read
.empty(); }
479 bool invalidates_cache() const { return plan
.invalidates_cache
; }
481 // must be true if requires_rmw(), must be false if invalidates_cache()
482 bool using_cache
= true;
484 /// In progress read state;
485 map
<hobject_t
,extent_set
> pending_read
; // subset already being read
486 map
<hobject_t
,extent_set
> remote_read
; // subset we must read
487 map
<hobject_t
,extent_map
> remote_read_result
;
488 bool read_in_progress() const {
489 return !remote_read
.empty() && remote_read_result
.empty();
492 /// In progress write state.
493 set
<pg_shard_t
> pending_commit
;
494 // we need pending_apply for pre-mimic peers so that we don't issue a
495 // read on a remote shard before it has applied a previous write. We can
496 // remove this after nautilus.
497 set
<pg_shard_t
> pending_apply
;
498 bool write_in_progress() const {
499 return !pending_commit
.empty() || !pending_apply
.empty();
502 /// optional, may be null, for tracking purposes
503 OpRequestRef client_op
;
506 ExtentCache::write_pin pin
;
509 Context
*on_all_commit
= nullptr;
511 delete on_all_commit
;
514 using op_list
= boost::intrusive::list
<Op
>;
515 friend ostream
&operator<<(ostream
&lhs
, const Op
&rhs
);
518 map
<ceph_tid_t
, Op
> tid_to_op_map
; /// Owns Op structure
521 * We model the possible rmw states as a set of waitlists.
522 * All writes at this time complete in order, so a write blocked
523 * at waiting_state blocks all writes behind it as well (same for
526 * Future work: We can break this up into a per-object pipeline
527 * (almost). First, provide an ordering token to submit_transaction
528 * and require that all operations within a single transaction take
529 * place on a subset of hobject_t space partitioned by that token
530 * (the hashid seem about right to me -- even works for temp objects
531 * if you recall that a temp object created for object head foo will
532 * only ever be referenced by other transactions on foo and aren't
533 * reused). Next, factor this part into a class and maintain one per
534 * ordering token. Next, fixup PrimaryLogPG's repop queue to be
535 * partitioned by ordering token. Finally, refactor the op pipeline
536 * so that the log entries passed into submit_transaction aren't
537 * versioned. We can't assign versions to them until we actually
538 * submit the operation. That's probably going to be the hard part.
540 class pipeline_state_t
{
544 } pipeline_state
= CACHE_VALID
;
546 bool caching_enabled() const {
547 return pipeline_state
== CACHE_VALID
;
549 bool cache_invalid() const {
550 return !caching_enabled();
553 pipeline_state
= CACHE_INVALID
;
556 pipeline_state
= CACHE_VALID
;
558 friend ostream
&operator<<(ostream
&lhs
, const pipeline_state_t
&rhs
);
562 op_list waiting_state
; /// writes waiting on pipe_state
563 op_list waiting_reads
; /// writes waiting on partial stripe reads
564 op_list waiting_commit
; /// writes waiting on initial commit
565 eversion_t completed_to
;
566 eversion_t committed_to
;
567 void start_rmw(Op
*op
, PGTransactionUPtr
&&t
);
568 bool try_state_to_reads();
569 bool try_reads_to_commit();
570 bool try_finish_rmw();
573 ErasureCodeInterfaceRef ec_impl
;
579 * Determines the whether _have is sufficient to recover an object
581 class ECRecPred
: public IsPGRecoverablePredicate
{
583 ErasureCodeInterfaceRef ec_impl
;
585 explicit ECRecPred(ErasureCodeInterfaceRef ec_impl
) : ec_impl(ec_impl
) {
586 for (unsigned i
= 0; i
< ec_impl
->get_chunk_count(); ++i
) {
590 bool operator()(const set
<pg_shard_t
> &_have
) const override
{
592 for (set
<pg_shard_t
>::const_iterator i
= _have
.begin();
595 have
.insert(i
->shard
);
597 map
<int, vector
<pair
<int, int>>> min
;
598 return ec_impl
->minimum_to_decode(want
, have
, &min
) == 0;
601 IsPGRecoverablePredicate
*get_is_recoverable_predicate() const override
{
602 return new ECRecPred(ec_impl
);
605 int get_ec_data_chunk_count() const override
{
606 return ec_impl
->get_data_chunk_count();
608 int get_ec_stripe_chunk_size() const override
{
609 return sinfo
.get_chunk_size();
615 * Determines the whether _have is sufficient to read an object
617 class ECReadPred
: public IsPGReadablePredicate
{
623 ErasureCodeInterfaceRef ec_impl
) : whoami(whoami
), rec_pred(ec_impl
) {}
624 bool operator()(const set
<pg_shard_t
> &_have
) const override
{
625 return _have
.count(whoami
) && rec_pred(_have
);
628 IsPGReadablePredicate
*get_is_readable_predicate() const override
{
629 return new ECReadPred(get_parent()->whoami_shard(), ec_impl
);
633 const ECUtil::stripe_info_t sinfo
;
634 /// If modified, ensure that the ref is held until the update is applied
635 SharedPtrRegistry
<hobject_t
, ECUtil::HashInfo
> unstable_hashinfo_registry
;
636 ECUtil::HashInfoRef
get_hash_info(const hobject_t
&hoid
, bool checks
= true,
637 const map
<string
,bufferptr
> *attr
= NULL
);
641 PGBackend::Listener
*pg
,
643 ObjectStore::CollectionHandle
&ch
,
646 ErasureCodeInterfaceRef ec_impl
,
647 uint64_t stripe_width
);
649 /// Returns to_read replicas sufficient to reconstruct want
650 int get_min_avail_to_read_shards(
651 const hobject_t
&hoid
, ///< [in] object
652 const set
<int> &want
, ///< [in] desired shards
653 bool for_recovery
, ///< [in] true if we may use non-acting replicas
654 bool do_redundant_reads
, ///< [in] true if we want to issue redundant reads to reduce latency
655 map
<pg_shard_t
, vector
<pair
<int, int>>> *to_read
///< [out] shards, corresponding subchunks to read
656 ); ///< @return error code, 0 on success
658 int get_remaining_shards(
659 const hobject_t
&hoid
,
660 const set
<int> &avail
,
661 const set
<int> &want
,
662 const read_result_t
&result
,
663 map
<pg_shard_t
, vector
<pair
<int, int>>> *to_read
,
666 int objects_get_attrs(
667 const hobject_t
&hoid
,
668 map
<string
, bufferlist
> *out
) override
;
670 void rollback_append(
671 const hobject_t
&hoid
,
673 ObjectStore::Transaction
*t
) override
;
675 bool auto_repair_supported() const override
{ return true; }
678 const hobject_t
&poid
,
680 ScrubMapBuilder
&pos
,
681 ScrubMap::object
&o
) override
;
682 uint64_t be_get_ondisk_size(uint64_t logical_size
) override
{
683 return sinfo
.logical_to_next_chunk_offset(logical_size
);
685 void _failed_push(const hobject_t
&hoid
,
686 pair
<RecoveryMessages
*, ECBackend::read_result_t
&> &in
);
688 ostream
&operator<<(ostream
&lhs
, const ECBackend::pipeline_state_t
&rhs
);