1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 * Ceph - scalable distributed file system
5 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
6 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
8 * Author: Loic Dachary <loic@dachary.org>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_REPLICATEDPG_H
18 #define CEPH_REPLICATEDPG_H
20 #include <boost/tuple/tuple.hpp>
21 #include "include/assert.h"
24 #include "TierAgentState.h"
25 #include "messages/MOSDOpReply.h"
26 #include "common/Checksummer.h"
27 #include "common/sharedptr_registry.hpp"
28 #include "ReplicatedBackend.h"
29 #include "PGTransaction.h"
31 class CopyFromCallback
;
32 class PromoteCallback
;
37 struct TierAgentState
;
42 void intrusive_ptr_add_ref(PrimaryLogPG
*pg
);
43 void intrusive_ptr_release(PrimaryLogPG
*pg
);
44 uint64_t get_with_id(PrimaryLogPG
*pg
);
45 void put_with_id(PrimaryLogPG
*pg
, uint64_t id
);
48 typedef TrackedIntPtr
<PrimaryLogPG
> PrimaryLogPGRef
;
50 typedef boost::intrusive_ptr
<PrimaryLogPG
> PrimaryLogPGRef
;
53 struct inconsistent_snapset_wrapper
;
55 class PrimaryLogPG
: public PG
, public PGBackend::Listener
{
60 MEMPOOL_CLASS_HELPERS();
63 * state associated with a copy operation
69 * CopyResults stores the object metadata of interest to a copy initiator.
72 ceph::real_time mtime
; ///< the copy source's mtime
73 uint64_t object_size
; ///< the copied object's size
74 bool started_temp_obj
; ///< true if the callback needs to delete temp object
75 hobject_t temp_oid
; ///< temp object (if any)
78 * Function to fill in transaction; if non-empty the callback
79 * must execute it before any other accesses to the object
80 * (in order to complete the copy).
82 std::function
<void(PGTransaction
*)> fill_in_final_tx
;
84 version_t user_version
; ///< The copy source's user version
85 bool should_requeue
; ///< op should be requeued on cancel
86 vector
<snapid_t
> snaps
; ///< src's snaps (if clone)
87 snapid_t snap_seq
; ///< src's snap_seq (if head)
88 librados::snap_set_t snapset
; ///< src snapset (if head)
91 uint32_t flags
; // object_copy_data_t::FLAG_*
92 uint32_t source_data_digest
, source_omap_digest
;
93 uint32_t data_digest
, omap_digest
;
94 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > reqids
; // [(reqid, user_version)]
95 map
<string
, bufferlist
> attrs
; // xattrs
96 uint64_t truncate_seq
;
97 uint64_t truncate_size
;
98 bool is_data_digest() {
99 return flags
& object_copy_data_t::FLAG_DATA_DIGEST
;
101 bool is_omap_digest() {
102 return flags
& object_copy_data_t::FLAG_OMAP_DIGEST
;
105 : object_size(0), started_temp_obj(false),
107 should_requeue(false), mirror_snapset(false),
110 source_data_digest(-1), source_omap_digest(-1),
111 data_digest(-1), omap_digest(-1),
112 truncate_seq(0), truncate_size(0)
118 ObjectContextRef obc
;
120 object_locator_t oloc
;
126 ceph_tid_t objecter_tid
;
127 ceph_tid_t objecter_tid2
;
129 object_copy_cursor_t cursor
;
130 map
<string
,bufferlist
> attrs
;
132 bufferlist omap_header
;
133 bufferlist omap_data
;
136 object_copy_cursor_t temp_cursor
;
139 * For CopyOp the process is:
140 * step1: read the data(attr/omap/data) from the source object
141 * step2: handle those data(w/ those data create a new object)
142 * src_obj_fadvise_flags used in step1;
143 * dest_obj_fadvise_flags used in step2
145 unsigned src_obj_fadvise_flags
;
146 unsigned dest_obj_fadvise_flags
;
148 CopyOp(CopyCallback
*cb_
, ObjectContextRef _obc
, hobject_t s
,
153 unsigned src_obj_fadvise_flags
,
154 unsigned dest_obj_fadvise_flags
)
155 : cb(cb_
), obc(_obc
), src(s
), oloc(l
), flags(f
),
160 src_obj_fadvise_flags(src_obj_fadvise_flags
),
161 dest_obj_fadvise_flags(dest_obj_fadvise_flags
)
163 results
.user_version
= v
;
164 results
.mirror_snapset
= mirror_snapset
;
167 typedef ceph::shared_ptr
<CopyOp
> CopyOpRef
;
170 * The CopyCallback class defines an interface for completions to the
171 * copy_start code. Users of the copy infrastructure must implement
172 * one and give an instance of the class to start_copy.
174 * The implementer is responsible for making sure that the CopyCallback
175 * can associate itself with the correct copy operation.
177 typedef boost::tuple
<int, CopyResults
*> CopyCallbackResults
;
179 friend class CopyFromCallback
;
180 friend class PromoteCallback
;
185 ceph_tid_t objecter_tid
;
187 version_t user_version
;
189 bool canceled
; ///< true if canceled
191 ProxyReadOp(OpRequestRef _op
, hobject_t oid
, vector
<OSDOp
>& _ops
)
192 : op(_op
), soid(oid
),
193 objecter_tid(0), ops(_ops
),
194 user_version(0), data_offset(0),
197 typedef ceph::shared_ptr
<ProxyReadOp
> ProxyReadOpRef
;
199 struct ProxyWriteOp
{
203 ceph_tid_t objecter_tid
;
205 version_t user_version
;
211 ProxyWriteOp(OpRequestRef _op
, hobject_t oid
, vector
<OSDOp
>& _ops
, osd_reqid_t _reqid
)
212 : ctx(NULL
), op(_op
), soid(oid
),
213 objecter_tid(0), ops(_ops
),
214 user_version(0), sent_reply(false),
218 typedef ceph::shared_ptr
<ProxyWriteOp
> ProxyWriteOpRef
;
221 ObjectContextRef obc
; ///< obc we are flushing
222 OpRequestRef op
; ///< initiating op
223 list
<OpRequestRef
> dup_ops
; ///< bandwagon jumpers
224 version_t flushed_version
; ///< user version we are flushing
225 ceph_tid_t objecter_tid
; ///< copy-from request tid
226 int rval
; ///< copy-from result
227 bool blocking
; ///< whether we are blocking updates
228 bool removal
; ///< we are removing the backend object
229 boost::optional
<std::function
<void()>> on_flush
; ///< callback, may be null
232 : flushed_version(0), objecter_tid(0), rval(0),
233 blocking(false), removal(false) {}
234 ~FlushOp() { assert(!on_flush
); }
236 typedef ceph::shared_ptr
<FlushOp
> FlushOpRef
;
238 boost::scoped_ptr
<PGBackend
> pgbackend
;
239 PGBackend
*get_pgbackend() override
{
240 return pgbackend
.get();
244 DoutPrefixProvider
*get_dpp() override
{
248 void on_local_recover(
249 const hobject_t
&oid
,
250 const ObjectRecoveryInfo
&recovery_info
,
251 ObjectContextRef obc
,
252 ObjectStore::Transaction
*t
254 void on_peer_recover(
256 const hobject_t
&oid
,
257 const ObjectRecoveryInfo
&recovery_info
259 void begin_peer_recover(
261 const hobject_t oid
) override
;
262 void on_global_recover(
263 const hobject_t
&oid
,
264 const object_stat_sum_t
&stat_diff
) override
;
265 void failed_push(const list
<pg_shard_t
> &from
, const hobject_t
&soid
) override
;
266 void primary_failed(const hobject_t
&soid
) override
;
267 bool primary_error(const hobject_t
& soid
, eversion_t v
) override
;
268 void cancel_pull(const hobject_t
&soid
) override
;
270 const hobject_t
&soid
,
271 const object_stat_sum_t
&delta_stats
) override
;
272 void on_primary_error(const hobject_t
&oid
, eversion_t v
) override
;
274 template<class T
> class BlessedGenContext
;
275 class BlessedContext
;
276 Context
*bless_context(Context
*c
) override
;
278 GenContext
<ThreadPool::TPHandle
&> *bless_gencontext(
279 GenContext
<ThreadPool::TPHandle
&> *c
) override
;
281 void send_message(int to_osd
, Message
*m
) override
{
282 osd
->send_message_osd_cluster(to_osd
, m
, get_osdmap()->get_epoch());
284 void queue_transaction(ObjectStore::Transaction
&& t
,
285 OpRequestRef op
) override
{
286 osd
->store
->queue_transaction(osr
.get(), std::move(t
), 0, 0, 0, op
);
288 void queue_transactions(vector
<ObjectStore::Transaction
>& tls
,
289 OpRequestRef op
) override
{
290 osd
->store
->queue_transactions(osr
.get(), tls
, 0, 0, 0, op
, NULL
);
292 epoch_t
get_epoch() const override
{
293 return get_osdmap()->get_epoch();
295 epoch_t
get_interval_start_epoch() const override
{
296 return info
.history
.same_interval_since
;
298 epoch_t
get_last_peering_reset_epoch() const override
{
299 return get_last_peering_reset();
301 const set
<pg_shard_t
> &get_actingbackfill_shards() const override
{
302 return actingbackfill
;
304 const set
<pg_shard_t
> &get_acting_shards() const override
{
307 const set
<pg_shard_t
> &get_backfill_shards() const override
{
308 return backfill_targets
;
311 std::string
gen_dbg_prefix() const override
{ return gen_prefix(); }
313 const map
<hobject_t
, set
<pg_shard_t
>>
314 &get_missing_loc_shards() const override
{
315 return missing_loc
.get_missing_locs();
317 const map
<pg_shard_t
, pg_missing_t
> &get_shard_missing() const override
{
320 using PGBackend::Listener::get_shard_missing
;
321 const map
<pg_shard_t
, pg_info_t
> &get_shard_info() const override
{
324 using PGBackend::Listener::get_shard_info
;
325 const pg_missing_tracker_t
&get_local_missing() const override
{
326 return pg_log
.get_missing();
328 const PGLog
&get_log() const override
{
331 bool pgb_is_primary() const override
{
334 OSDMapRef
pgb_get_osdmap() const override
{
337 const pg_info_t
&get_info() const override
{
340 const pg_pool_t
&get_pool() const override
{
344 ObjectContextRef
get_obc(
345 const hobject_t
&hoid
,
346 const map
<string
, bufferlist
> &attrs
) override
{
347 return get_object_context(hoid
, true, &attrs
);
350 bool try_lock_for_read(
351 const hobject_t
&hoid
,
352 ObcLockManager
&manager
) override
{
353 if (is_missing_object(hoid
))
355 auto obc
= get_object_context(hoid
, false, nullptr);
358 return manager
.try_get_read_lock(hoid
, obc
);
361 void release_locks(ObcLockManager
&manager
) override
{
362 release_object_locks(manager
);
365 void pgb_set_object_snap_mapping(
366 const hobject_t
&soid
,
367 const set
<snapid_t
> &snaps
,
368 ObjectStore::Transaction
*t
) override
{
369 return update_object_snap_mapping(t
, soid
, snaps
);
371 void pgb_clear_object_snap_mapping(
372 const hobject_t
&soid
,
373 ObjectStore::Transaction
*t
) override
{
374 return clear_object_snap_mapping(t
, soid
);
378 const vector
<pg_log_entry_t
> &logv
,
379 const boost::optional
<pg_hit_set_history_t
> &hset_history
,
380 const eversion_t
&trim_to
,
381 const eversion_t
&roll_forward_to
,
382 bool transaction_applied
,
383 ObjectStore::Transaction
&t
) override
{
385 info
.hit_set
= *hset_history
;
387 append_log(logv
, trim_to
, roll_forward_to
, t
, transaction_applied
);
390 struct C_OSD_OnApplied
;
392 const eversion_t
&applied_version
) override
;
396 const hobject_t
&hoid
) override
{
397 if (peer
== get_primary())
399 assert(peer_info
.count(peer
));
401 hoid
.pool
!= (int64_t)info
.pgid
.pool() ||
402 hoid
<= last_backfill_started
||
403 hoid
<= peer_info
[peer
].last_backfill
;
405 assert(is_backfill_targets(peer
));
409 void update_peer_last_complete_ondisk(
411 eversion_t lcod
) override
{
412 peer_last_complete_ondisk
[fromosd
] = lcod
;
415 void update_last_complete_ondisk(
416 eversion_t lcod
) override
{
417 last_complete_ondisk
= lcod
;
421 const pg_stat_t
&stat
) override
{
425 void schedule_recovery_work(
426 GenContext
<ThreadPool::TPHandle
&> *c
) override
;
428 pg_shard_t
whoami_shard() const override
{
431 spg_t
primary_spg_t() const override
{
432 return spg_t(info
.pgid
.pgid
, primary
.shard
);
434 pg_shard_t
primary_shard() const override
{
437 uint64_t min_peer_features() const override
{
438 return get_min_peer_features();
441 void send_message_osd_cluster(
442 int peer
, Message
*m
, epoch_t from_epoch
) override
;
443 void send_message_osd_cluster(
444 Message
*m
, Connection
*con
) override
;
445 void send_message_osd_cluster(
446 Message
*m
, const ConnectionRef
& con
) override
;
447 ConnectionRef
get_con_osd_cluster(int peer
, epoch_t from_epoch
) override
;
448 entity_name_t
get_cluster_msgr_name() override
{
449 return osd
->get_cluster_msgr_name();
452 PerfCounters
*get_logger() override
;
454 ceph_tid_t
get_tid() override
{ return osd
->get_tid(); }
456 LogClientTemp
clog_error() override
{ return osd
->clog
->error(); }
458 struct watch_disconnect_t
{
461 bool send_disconnect
;
462 watch_disconnect_t(uint64_t c
, entity_name_t n
, bool sd
)
463 : cookie(c
), name(n
), send_disconnect(sd
) {}
465 void complete_disconnect_watches(
466 ObjectContextRef obc
,
467 const list
<watch_disconnect_t
> &to_disconnect
);
470 * Capture all object state associated with an in-progress read or write.
477 const ObjectState
*obs
; // Old objectstate
478 const SnapSet
*snapset
; // Old snapset
480 ObjectState new_obs
; // resulting ObjectState
481 SnapSet new_snapset
; // resulting SnapSet (in case of a write)
482 //pg_stat_t new_stats; // resulting Stats
483 object_stat_sum_t delta_stats
;
485 bool modify
; // (force) modification (even if op_t is empty)
486 bool user_modify
; // user-visible modification
487 bool undirty
; // user explicitly un-dirtying this object
488 bool cache_evict
; ///< true if this is a cache eviction
489 bool ignore_cache
; ///< true if IGNORE_CACHE flag is set
490 bool ignore_log_op_stats
; // don't log op stats
491 bool update_log_only
; ///< this is a write that returned an error - just record in pg log for dup detection
494 list
<pair
<watch_info_t
,bool> > watch_connects
; ///< new watch + will_ping flag
495 list
<watch_disconnect_t
> watch_disconnects
; ///< old watch + send_discon
496 list
<notify_info_t
> notifies
;
498 boost::optional
<uint64_t> watch_cookie
;
501 explicit NotifyAck(uint64_t notify_id
) : notify_id(notify_id
) {}
502 NotifyAck(uint64_t notify_id
, uint64_t cookie
, bufferlist
& rbl
)
503 : watch_cookie(cookie
), notify_id(notify_id
) {
507 list
<NotifyAck
> notify_acks
;
509 uint64_t bytes_written
, bytes_read
;
512 SnapContext snapc
; // writer snap context
513 eversion_t at_version
; // pg's current version pointer
514 version_t user_at_version
; // pg's current user version pointer
516 int current_osd_subop_num
;
518 PGTransactionUPtr op_t
;
519 vector
<pg_log_entry_t
> log
;
520 boost::optional
<pg_hit_set_history_t
> updated_hset_history
;
522 interval_set
<uint64_t> modified_ranges
;
523 ObjectContextRef obc
;
524 ObjectContextRef clone_obc
; // if we created a clone
525 ObjectContextRef snapset_obc
; // if we created/deleted a snapdir
527 int data_off
; // FIXME: we may want to kill this msgr hint off at some point!
533 int num_read
; ///< count read ops
534 int num_write
; ///< count update ops
536 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > extra_reqids
;
538 CopyFromCallback
*copy_cb
;
540 hobject_t new_temp_oid
, discard_temp_oid
; ///< temp objects we should start/stop tracking
542 list
<std::function
<void()>> on_applied
;
543 list
<std::function
<void()>> on_committed
;
544 list
<std::function
<void()>> on_finish
;
545 list
<std::function
<void()>> on_success
;
546 template <typename F
>
547 void register_on_finish(F
&&f
) {
548 on_finish
.emplace_back(std::forward
<F
>(f
));
550 template <typename F
>
551 void register_on_success(F
&&f
) {
552 on_success
.emplace_back(std::forward
<F
>(f
));
554 template <typename F
>
555 void register_on_applied(F
&&f
) {
556 on_applied
.emplace_back(std::forward
<F
>(f
));
558 template <typename F
>
559 void register_on_commit(F
&&f
) {
560 on_committed
.emplace_back(std::forward
<F
>(f
));
565 // pending async reads <off, len, op_flags> -> <outbl, outr>
566 list
<pair
<boost::tuple
<uint64_t, uint64_t, unsigned>,
567 pair
<bufferlist
*, Context
*> > > pending_async_reads
;
568 int async_read_result
;
570 friend struct OnReadComplete
;
571 void start_async_reads(PrimaryLogPG
*pg
);
572 void finish_read(PrimaryLogPG
*pg
);
573 bool async_reads_complete() {
574 return inflightreads
== 0;
577 ObjectContext::RWState::State lock_type
;
578 ObcLockManager lock_manager
;
580 OpContext(const OpContext
& other
);
581 const OpContext
& operator=(const OpContext
& other
);
583 OpContext(OpRequestRef _op
, osd_reqid_t _reqid
, vector
<OSDOp
>& _ops
,
584 ObjectContextRef
& obc
,
586 op(_op
), reqid(_reqid
), ops(_ops
),
589 new_obs(obs
->oi
, obs
->exists
),
590 modify(false), user_modify(false), undirty(false), cache_evict(false),
591 ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
592 bytes_written(0), bytes_read(0), user_at_version(0),
593 current_osd_subop_num(0),
595 data_off(0), reply(NULL
), pg(_pg
),
600 async_read_result(0),
602 lock_type(ObjectContext::RWState::RWNONE
) {
604 new_snapset
= obc
->ssc
->snapset
;
605 snapset
= &obc
->ssc
->snapset
;
608 OpContext(OpRequestRef _op
, osd_reqid_t _reqid
,
609 vector
<OSDOp
>& _ops
, PrimaryLogPG
*_pg
) :
610 op(_op
), reqid(_reqid
), ops(_ops
), obs(NULL
), snapset(0),
611 modify(false), user_modify(false), undirty(false), cache_evict(false),
612 ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
613 bytes_written(0), bytes_read(0), user_at_version(0),
614 current_osd_subop_num(0),
615 data_off(0), reply(NULL
), pg(_pg
),
619 async_read_result(0),
621 lock_type(ObjectContext::RWState::RWNONE
) {}
622 void reset_obs(ObjectContextRef obc
) {
623 new_obs
= ObjectState(obc
->obs
.oi
, obc
->obs
.exists
);
625 new_snapset
= obc
->ssc
->snapset
;
626 snapset
= &obc
->ssc
->snapset
;
633 for (list
<pair
<boost::tuple
<uint64_t, uint64_t, unsigned>,
634 pair
<bufferlist
*, Context
*> > >::iterator i
=
635 pending_async_reads
.begin();
636 i
!= pending_async_reads
.end();
637 pending_async_reads
.erase(i
++)) {
638 delete i
->second
.second
;
641 uint64_t get_features() {
642 if (op
&& op
->get_req()) {
643 return op
->get_req()->get_connection()->get_features();
648 using OpContextUPtr
= std::unique_ptr
<OpContext
>;
649 friend struct OpContext
;
652 * State on the PG primary associated with the replicated mutation
658 xlist
<RepGather
*>::item queue_item
;
666 bool rep_aborted
, rep_done
;
670 const bool applies_with_commit
;
674 eversion_t pg_local_last_complete
;
676 ObcLockManager lock_manager
;
678 list
<std::function
<void()>> on_applied
;
679 list
<std::function
<void()>> on_committed
;
680 list
<std::function
<void()>> on_success
;
681 list
<std::function
<void()>> on_finish
;
684 OpContext
*c
, ceph_tid_t rt
,
686 bool applies_with_commit
) :
687 hoid(c
->obc
->obs
.oi
.soid
),
692 rep_aborted(false), rep_done(false),
693 all_applied(false), all_committed(false),
694 applies_with_commit(applies_with_commit
),
695 pg_local_last_complete(lc
),
696 lock_manager(std::move(c
->lock_manager
)),
697 on_applied(std::move(c
->on_applied
)),
698 on_committed(std::move(c
->on_committed
)),
699 on_success(std::move(c
->on_success
)),
700 on_finish(std::move(c
->on_finish
)) {}
703 ObcLockManager
&&manager
,
705 boost::optional
<std::function
<void(void)> > &&on_complete
,
708 bool applies_with_commit
,
715 rep_aborted(false), rep_done(false),
716 all_applied(false), all_committed(false),
717 applies_with_commit(applies_with_commit
),
718 pg_local_last_complete(lc
),
719 lock_manager(std::move(manager
)) {
721 on_success
.push_back(std::move(*on_complete
));
732 assert(on_applied
.empty());
734 //generic_dout(0) << "deleting " << this << dendl;
743 * Grabs locks for OpContext, should be cleaned up in close_op_ctx
745 * @param ctx [in,out] ctx to get locks for
746 * @return true on success, false if we are queued
748 bool get_rw_locks(bool write_ordered
, OpContext
*ctx
) {
749 /* If snapset_obc, !obc->obs->exists and we will always take the
750 * snapdir lock *before* the head lock. Since all callers will do
751 * this (read or write) if we get the first we will be guaranteed
754 if (write_ordered
&& ctx
->op
->may_read()) {
755 ctx
->lock_type
= ObjectContext::RWState::RWEXCL
;
756 } else if (write_ordered
) {
757 ctx
->lock_type
= ObjectContext::RWState::RWWRITE
;
759 assert(ctx
->op
->may_read());
760 ctx
->lock_type
= ObjectContext::RWState::RWREAD
;
763 if (ctx
->snapset_obc
) {
764 assert(!ctx
->obc
->obs
.exists
);
765 if (!ctx
->lock_manager
.get_lock_type(
767 ctx
->snapset_obc
->obs
.oi
.soid
,
770 ctx
->lock_type
= ObjectContext::RWState::RWNONE
;
774 if (ctx
->lock_manager
.get_lock_type(
776 ctx
->obc
->obs
.oi
.soid
,
781 assert(!ctx
->snapset_obc
);
782 ctx
->lock_type
= ObjectContext::RWState::RWNONE
;
788 * Cleans up OpContext
790 * @param ctx [in] ctx to clean up
792 void close_op_ctx(OpContext
*ctx
) {
793 release_object_locks(ctx
->lock_manager
);
795 for (auto p
= ctx
->on_finish
.begin();
796 p
!= ctx
->on_finish
.end();
797 ctx
->on_finish
.erase(p
++)) {
806 * @param manager [in] manager with locks to release
808 void release_object_locks(
809 ObcLockManager
&lock_manager
) {
810 list
<pair
<hobject_t
, list
<OpRequestRef
> > > to_req
;
811 bool requeue_recovery
= false;
812 bool requeue_snaptrim
= false;
813 lock_manager
.put_locks(
817 if (requeue_recovery
)
819 if (requeue_snaptrim
)
820 snap_trimmer_machine
.process_event(TrimWriteUnblocked());
822 if (!to_req
.empty()) {
823 // requeue at front of scrub blocking queue if we are blocked by scrub
824 for (auto &&p
: to_req
) {
825 if (scrubber
.write_blocked_by_scrub(p
.first
.get_head())) {
826 waiting_for_scrub
.splice(
827 waiting_for_scrub
.begin(),
832 requeue_ops(p
.second
);
840 xlist
<RepGather
*> repop_queue
;
842 friend class C_OSD_RepopApplied
;
843 friend class C_OSD_RepopCommit
;
844 void repop_all_applied(RepGather
*repop
);
845 void repop_all_committed(RepGather
*repop
);
846 void eval_repop(RepGather
*);
847 void issue_repop(RepGather
*repop
, OpContext
*ctx
);
848 RepGather
*new_repop(
850 ObjectContextRef obc
,
852 boost::intrusive_ptr
<RepGather
> new_repop(
855 ObcLockManager
&&manager
,
857 boost::optional
<std::function
<void(void)> > &&on_complete
);
858 void remove_repop(RepGather
*repop
);
860 OpContextUPtr
simple_opc_create(ObjectContextRef obc
);
861 void simple_opc_submit(OpContextUPtr ctx
);
864 * Merge entries atomically into all actingbackfill osds
865 * adjusting missing and recovery state as necessary.
867 * Also used to store error log entries for dup detection.
869 void submit_log_entries(
870 const mempool::osd_pglog::list
<pg_log_entry_t
> &entries
,
871 ObcLockManager
&&manager
,
872 boost::optional
<std::function
<void(void)> > &&on_complete
,
873 OpRequestRef op
= OpRequestRef(),
875 struct LogUpdateCtx
{
876 boost::intrusive_ptr
<RepGather
> repop
;
877 set
<pg_shard_t
> waiting_on
;
879 void cancel_log_updates();
880 map
<ceph_tid_t
, LogUpdateCtx
> log_entry_update_waiting_on
;
884 HitSetRef hit_set
; ///< currently accumulating HitSet
885 utime_t hit_set_start_stamp
; ///< time the current HitSet started recording
888 void hit_set_clear(); ///< discard any HitSet state
889 void hit_set_setup(); ///< initialize HitSet state
890 void hit_set_create(); ///< create a new HitSet
891 void hit_set_persist(); ///< persist hit info
892 bool hit_set_apply_log(); ///< apply log entries to update in-memory HitSet
893 void hit_set_trim(OpContextUPtr
&ctx
, unsigned max
); ///< discard old HitSets
894 void hit_set_in_memory_trim(uint32_t max_in_memory
); ///< discard old in memory HitSets
895 void hit_set_remove_all();
897 hobject_t
get_hit_set_current_object(utime_t stamp
);
898 hobject_t
get_hit_set_archive_object(utime_t start
,
903 boost::scoped_ptr
<TierAgentState
> agent_state
;
905 void agent_setup(); ///< initialize agent state
906 bool agent_work(int max
) override
///< entry point to do some agent work
908 return agent_work(max
, max
);
910 bool agent_work(int max
, int agent_flush_quota
) override
;
911 bool agent_maybe_flush(ObjectContextRef
& obc
); ///< maybe flush
912 bool agent_maybe_evict(ObjectContextRef
& obc
, bool after_flush
); ///< maybe evict
914 void agent_load_hit_sets(); ///< load HitSets, if needed
916 /// estimate object atime and temperature
918 /// @param oid [in] object name
919 /// @param temperature [out] relative temperature (# consider both access time and frequency)
920 void agent_estimate_temp(const hobject_t
& oid
, int *temperature
);
923 void agent_stop() override
;
924 void agent_delay() override
;
926 /// clear agent state
927 void agent_clear() override
;
929 /// choose (new) agent mode(s), returns true if op is requeued
930 bool agent_choose_mode(bool restart
= false, OpRequestRef op
= OpRequestRef());
931 void agent_choose_mode_restart() override
;
933 /// true if we can send an ondisk/commit for v
934 bool already_complete(eversion_t v
);
935 /// true if we can send an ack for v
936 bool already_ack(eversion_t v
);
938 // projected object info
939 SharedLRU
<hobject_t
, ObjectContext
> object_contexts
;
940 // map from oid.snapdir() to SnapSetContext *
941 map
<hobject_t
, SnapSetContext
*> snapset_contexts
;
942 Mutex snapset_contexts_lock
;
944 // debug order that client ops are applied
945 map
<hobject_t
, map
<client_t
, ceph_tid_t
>> debug_op_order
;
947 void populate_obc_watchers(ObjectContextRef obc
);
948 void check_blacklisted_obc_watchers(ObjectContextRef obc
);
949 void check_blacklisted_watchers() override
;
950 void get_watchers(list
<obj_watch_item_t
> &pg_watchers
) override
;
951 void get_obc_watchers(ObjectContextRef obc
, list
<obj_watch_item_t
> &pg_watchers
);
953 void handle_watch_timeout(WatchRef watch
);
956 ObjectContextRef
create_object_context(const object_info_t
& oi
, SnapSetContext
*ssc
);
957 ObjectContextRef
get_object_context(
958 const hobject_t
& soid
,
960 const map
<string
, bufferlist
> *attrs
= 0
963 void context_registry_on_change();
964 void object_context_destructor_callback(ObjectContext
*obc
);
965 class C_PG_ObjectContext
;
967 int find_object_context(const hobject_t
& oid
,
968 ObjectContextRef
*pobc
,
970 bool map_snapid_to_clone
=false,
971 hobject_t
*missing_oid
=NULL
);
973 void add_object_context_to_pg_stat(ObjectContextRef obc
, pg_stat_t
*stat
);
975 void get_src_oloc(const object_t
& oid
, const object_locator_t
& oloc
, object_locator_t
& src_oloc
);
977 SnapSetContext
*get_snapset_context(
978 const hobject_t
& oid
,
980 const map
<string
, bufferlist
> *attrs
= 0,
981 bool oid_existed
= true //indicate this oid whether exsited in backend
983 void register_snapset_context(SnapSetContext
*ssc
) {
984 Mutex::Locker
l(snapset_contexts_lock
);
985 _register_snapset_context(ssc
);
987 void _register_snapset_context(SnapSetContext
*ssc
) {
988 assert(snapset_contexts_lock
.is_locked());
989 if (!ssc
->registered
) {
990 assert(snapset_contexts
.count(ssc
->oid
) == 0);
991 ssc
->registered
= true;
992 snapset_contexts
[ssc
->oid
] = ssc
;
995 void put_snapset_context(SnapSetContext
*ssc
);
997 map
<hobject_t
, ObjectContextRef
> recovering
;
1002 * peer_info[backfill_target].last_backfill == info.last_backfill on the peer.
1004 * objects prior to peer_info[backfill_target].last_backfill
1006 * - are included in the peer stats
1008 * objects \in (last_backfill, last_backfill_started]
1009 * - are on the peer or are in backfills_in_flight
1010 * - are not included in pg stats (yet)
1011 * - have their stats in pending_backfill_updates on the primary
1013 set
<hobject_t
> backfills_in_flight
;
1014 map
<hobject_t
, pg_stat_t
> pending_backfill_updates
;
1016 void dump_recovery_info(Formatter
*f
) const override
{
1017 f
->open_array_section("backfill_targets");
1018 for (set
<pg_shard_t
>::const_iterator p
= backfill_targets
.begin();
1019 p
!= backfill_targets
.end(); ++p
)
1020 f
->dump_stream("replica") << *p
;
1022 f
->open_array_section("waiting_on_backfill");
1023 for (set
<pg_shard_t
>::const_iterator p
= waiting_on_backfill
.begin();
1024 p
!= waiting_on_backfill
.end(); ++p
)
1025 f
->dump_stream("osd") << *p
;
1027 f
->dump_stream("last_backfill_started") << last_backfill_started
;
1029 f
->open_object_section("backfill_info");
1030 backfill_info
.dump(f
);
1034 f
->open_array_section("peer_backfill_info");
1035 for (map
<pg_shard_t
, BackfillInterval
>::const_iterator pbi
=
1036 peer_backfill_info
.begin();
1037 pbi
!= peer_backfill_info
.end(); ++pbi
) {
1038 f
->dump_stream("osd") << pbi
->first
;
1039 f
->open_object_section("BackfillInterval");
1040 pbi
->second
.dump(f
);
1046 f
->open_array_section("backfills_in_flight");
1047 for (set
<hobject_t
>::const_iterator i
= backfills_in_flight
.begin();
1048 i
!= backfills_in_flight
.end();
1050 f
->dump_stream("object") << *i
;
1055 f
->open_array_section("recovering");
1056 for (map
<hobject_t
, ObjectContextRef
>::const_iterator i
= recovering
.begin();
1057 i
!= recovering
.end();
1059 f
->dump_stream("object") << i
->first
;
1064 f
->open_object_section("pg_backend");
1065 pgbackend
->dump_recovery_info(f
);
1070 /// last backfill operation started
1071 hobject_t last_backfill_started
;
1074 int prep_object_replica_pushes(const hobject_t
& soid
, eversion_t v
,
1075 PGBackend::RecoveryHandle
*h
);
1077 void finish_degraded_object(const hobject_t
& oid
);
1079 // Cancels/resets pulls from peer
1080 void check_recovery_sources(const OSDMapRef
& map
) override
;
1082 int recover_missing(
1083 const hobject_t
& oid
,
1086 PGBackend::RecoveryHandle
*h
);
1093 ObjectContextRef obc
,
1094 const hobject_t
& head
, const hobject_t
& coid
,
1095 object_info_t
*poi
);
1096 void execute_ctx(OpContext
*ctx
);
1097 void finish_ctx(OpContext
*ctx
, int log_op_type
, bool maintain_ssc
=true);
1098 void reply_ctx(OpContext
*ctx
, int err
);
1099 void reply_ctx(OpContext
*ctx
, int err
, eversion_t v
, version_t uv
);
1100 void make_writeable(OpContext
*ctx
);
1101 void log_op_stats(OpContext
*ctx
);
1103 void write_update_size_and_usage(object_stat_sum_t
& stats
, object_info_t
& oi
,
1104 interval_set
<uint64_t>& modified
, uint64_t offset
,
1105 uint64_t length
, bool write_full
=false);
1106 void add_interval_usage(interval_set
<uint64_t>& s
, object_stat_sum_t
& st
);
1109 enum class cache_result_t
{
1116 cache_result_t
maybe_handle_cache_detail(OpRequestRef op
,
1118 ObjectContextRef obc
, int r
,
1119 hobject_t missing_oid
,
1122 ObjectContextRef
*promote_obc
);
1123 cache_result_t
maybe_handle_manifest_detail(OpRequestRef op
,
1125 ObjectContextRef obc
);
1126 bool maybe_handle_manifest(OpRequestRef op
,
1128 ObjectContextRef obc
) {
1129 return cache_result_t::NOOP
!= maybe_handle_manifest_detail(
1136 * This helper function is called from do_op if the ObjectContext lookup fails.
1137 * @returns true if the caching code is handling the Op, false otherwise.
1139 bool maybe_handle_cache(OpRequestRef op
,
1141 ObjectContextRef obc
, int r
,
1142 const hobject_t
& missing_oid
,
1144 bool in_hit_set
= false) {
1145 return cache_result_t::NOOP
!= maybe_handle_cache_detail(
1157 * This helper function checks if a promotion is needed.
1159 bool maybe_promote(ObjectContextRef obc
,
1160 const hobject_t
& missing_oid
,
1161 const object_locator_t
& oloc
,
1164 OpRequestRef promote_op
,
1165 ObjectContextRef
*promote_obc
= nullptr);
1167 * This helper function tells the client to redirect their request elsewhere.
1169 void do_cache_redirect(OpRequestRef op
);
1171 * This function attempts to start a promote. Either it succeeds,
1172 * or places op on a wait list. If op is null, failure means that
1173 * this is a noop. If a future user wants to be able to distinguish
1174 * these cases, a return value should be added.
1176 void promote_object(
1177 ObjectContextRef obc
, ///< [optional] obc
1178 const hobject_t
& missing_object
, ///< oid (if !obc)
1179 const object_locator_t
& oloc
, ///< locator for obc|oid
1180 OpRequestRef op
, ///< [optional] client op
1181 ObjectContextRef
*promote_obc
= nullptr ///< [optional] new obc for object
1184 int prepare_transaction(OpContext
*ctx
);
1185 list
<pair
<OpRequestRef
, OpContext
*> > in_progress_async_reads
;
1186 void complete_read_ctx(int result
, OpContext
*ctx
);
1188 // pg on-disk content
1189 void check_local() override
;
1191 void _clear_recovery_state() override
;
1193 bool start_recovery_ops(
1195 ThreadPool::TPHandle
&handle
, uint64_t *started
) override
;
1197 uint64_t recover_primary(uint64_t max
, ThreadPool::TPHandle
&handle
);
1198 uint64_t recover_replicas(uint64_t max
, ThreadPool::TPHandle
&handle
);
1199 hobject_t
earliest_peer_backfill() const;
1200 bool all_peer_done() const;
1202 * @param work_started will be set to true if recover_backfill got anywhere
1203 * @returns the number of operations started
1205 uint64_t recover_backfill(uint64_t max
, ThreadPool::TPHandle
&handle
,
1206 bool *work_started
);
1209 * scan a (hash) range of objects in the current pg
1211 * @begin first item should be >= this value
1212 * @min return at least this many items, unless we are done
1213 * @max return no more than this many items
1214 * @bi [out] resulting map of objects to eversion_t's
1217 int min
, int max
, BackfillInterval
*bi
,
1218 ThreadPool::TPHandle
&handle
1221 /// Update a hash range to reflect changes since the last scan
1223 BackfillInterval
*bi
, ///< [in,out] interval to update
1224 ThreadPool::TPHandle
&handle
///< [in] tp handle
1227 int prep_backfill_object_push(
1228 hobject_t oid
, eversion_t v
, ObjectContextRef obc
,
1229 vector
<pg_shard_t
> peers
,
1230 PGBackend::RecoveryHandle
*h
);
1231 void send_remove_op(const hobject_t
& oid
, eversion_t v
, pg_shard_t peer
);
1234 class C_OSD_OndiskWriteUnlock
;
1235 class C_OSD_AppliedRecoveredObject
;
1236 class C_OSD_CommittedPushedObject
;
1237 class C_OSD_AppliedRecoveredObjectReplica
;
1238 void sub_op_remove(OpRequestRef op
);
1240 void _applied_recovered_object(ObjectContextRef obc
);
1241 void _applied_recovered_object_replica();
1242 void _committed_pushed_object(epoch_t epoch
, eversion_t lc
);
1243 void recover_got(hobject_t oid
, eversion_t v
);
1246 map
<hobject_t
, CopyOpRef
> copy_ops
;
1248 int fill_in_copy_get(
1250 bufferlist::iterator
& bp
,
1252 ObjectContextRef
& obc
);
1253 void fill_in_copy_get_noent(OpRequestRef
& op
, hobject_t oid
,
1257 * To copy an object, call start_copy.
1259 * @param cb: The CopyCallback to be activated when the copy is complete
1260 * @param obc: The ObjectContext we are copying into
1261 * @param src: The source object
1262 * @param oloc: the source object locator
1263 * @param version: the version of the source object to copy (0 for any)
1265 void start_copy(CopyCallback
*cb
, ObjectContextRef obc
, hobject_t src
,
1266 object_locator_t oloc
, version_t version
, unsigned flags
,
1267 bool mirror_snapset
, unsigned src_obj_fadvise_flags
,
1268 unsigned dest_obj_fadvise_flags
);
1269 void process_copy_chunk(hobject_t oid
, ceph_tid_t tid
, int r
);
1270 void _write_copy_chunk(CopyOpRef cop
, PGTransaction
*t
);
1271 uint64_t get_copy_chunk_size() const {
1272 uint64_t size
= cct
->_conf
->osd_copyfrom_max_chunk
;
1273 if (pool
.info
.requires_aligned_append()) {
1274 uint64_t alignment
= pool
.info
.required_alignment();
1275 if (size
% alignment
) {
1276 size
+= alignment
- (size
% alignment
);
1281 void _copy_some(ObjectContextRef obc
, CopyOpRef cop
);
1282 void finish_copyfrom(OpContext
*ctx
);
1283 void finish_promote(int r
, CopyResults
*results
, ObjectContextRef obc
);
1284 void cancel_copy(CopyOpRef cop
, bool requeue
);
1285 void cancel_copy_ops(bool requeue
);
1287 friend struct C_Copyfrom
;
1290 map
<hobject_t
, FlushOpRef
> flush_ops
;
1292 /// start_flush takes ownership of on_flush iff ret == -EINPROGRESS
1294 OpRequestRef op
, ObjectContextRef obc
,
1295 bool blocking
, hobject_t
*pmissing
,
1296 boost::optional
<std::function
<void()>> &&on_flush
);
1297 void finish_flush(hobject_t oid
, ceph_tid_t tid
, int r
);
1298 int try_flush_mark_clean(FlushOpRef fop
);
1299 void cancel_flush(FlushOpRef fop
, bool requeue
);
1300 void cancel_flush_ops(bool requeue
);
1302 /// @return false if clone is has been evicted
1303 bool is_present_clone(hobject_t coid
);
1305 friend struct C_Flush
;
1308 bool _range_available_for_scrub(
1309 const hobject_t
&begin
, const hobject_t
&end
) override
;
1310 void scrub_snapshot_metadata(
1312 const std::map
<hobject_t
, pair
<uint32_t, uint32_t>> &missing_digest
) override
;
1313 void _scrub_clear_state() override
;
1314 void _scrub_finish() override
;
1315 object_stat_collection_t scrub_cstat
;
1317 void _split_into(pg_t child_pgid
, PG
*child
,
1318 unsigned split_bits
) override
;
1319 void apply_and_flush_repops(bool requeue
);
1321 void calc_trim_to() override
;
1322 int do_xattr_cmp_u64(int op
, __u64 v1
, bufferlist
& xattr
);
1323 int do_xattr_cmp_str(int op
, string
& v1s
, bufferlist
& xattr
);
1326 int do_checksum(OpContext
*ctx
, OSDOp
& osd_op
, bufferlist::iterator
*bl_it
,
1328 int finish_checksum(OSDOp
& osd_op
, Checksummer::CSumType csum_type
,
1329 bufferlist::iterator
*init_value_bl_it
,
1330 const bufferlist
&read_bl
);
1332 friend class C_ChecksumRead
;
1334 int do_extent_cmp(OpContext
*ctx
, OSDOp
& osd_op
);
1335 int do_writesame(OpContext
*ctx
, OSDOp
& osd_op
);
1337 bool pgls_filter(PGLSFilter
*filter
, hobject_t
& sobj
, bufferlist
& outdata
);
1338 int get_pgls_filter(bufferlist::iterator
& iter
, PGLSFilter
**pfilter
);
1340 map
<hobject_t
, list
<OpRequestRef
>> in_progress_proxy_ops
;
1341 void kick_proxy_ops_blocked(hobject_t
& soid
);
1342 void cancel_proxy_ops(bool requeue
);
1345 map
<ceph_tid_t
, ProxyReadOpRef
> proxyread_ops
;
1347 void do_proxy_read(OpRequestRef op
, ObjectContextRef obc
= NULL
);
1348 void finish_proxy_read(hobject_t oid
, ceph_tid_t tid
, int r
);
1349 void cancel_proxy_read(ProxyReadOpRef prdop
);
1351 friend struct C_ProxyRead
;
1354 map
<ceph_tid_t
, ProxyWriteOpRef
> proxywrite_ops
;
1356 void do_proxy_write(OpRequestRef op
, const hobject_t
& missing_oid
, ObjectContextRef obc
= NULL
);
1357 void finish_proxy_write(hobject_t oid
, ceph_tid_t tid
, int r
);
1358 void cancel_proxy_write(ProxyWriteOpRef pwop
);
1360 friend struct C_ProxyWrite_Commit
;
1363 PrimaryLogPG(OSDService
*o
, OSDMapRef curmap
,
1364 const PGPool
&_pool
, spg_t p
);
1365 ~PrimaryLogPG() override
{}
1373 ceph_tid_t tid
) override
;
1377 ThreadPool::TPHandle
&handle
) override
;
1378 void do_op(OpRequestRef
& op
) override
;
1379 void record_write_error(OpRequestRef op
, const hobject_t
&soid
,
1380 MOSDOpReply
*orig_reply
, int r
);
1381 void do_pg_op(OpRequestRef op
);
1382 void do_sub_op(OpRequestRef op
) override
;
1383 void do_sub_op_reply(OpRequestRef op
) override
;
1386 ThreadPool::TPHandle
&handle
) override
;
1387 void do_backfill(OpRequestRef op
) override
;
1388 void do_backfill_remove(OpRequestRef op
);
1390 void handle_backoff(OpRequestRef
& op
);
1392 int trim_object(bool first
, const hobject_t
&coid
, OpContextUPtr
*ctxp
);
1393 void snap_trimmer(epoch_t e
) override
;
1394 void kick_snap_trim() override
;
1395 void snap_trimmer_scrub_complete() override
;
1396 int do_osd_ops(OpContext
*ctx
, vector
<OSDOp
>& ops
);
1398 int _get_tmap(OpContext
*ctx
, bufferlist
*header
, bufferlist
*vals
);
1399 int do_tmap2omap(OpContext
*ctx
, unsigned flags
);
1400 int do_tmapup(OpContext
*ctx
, bufferlist::iterator
& bp
, OSDOp
& osd_op
);
1401 int do_tmapup_slow(OpContext
*ctx
, bufferlist::iterator
& bp
, OSDOp
& osd_op
, bufferlist
& bl
);
1403 void do_osd_op_effects(OpContext
*ctx
, const ConnectionRef
& conn
);
1405 int do_scrub_ls(MOSDOp
*op
, OSDOp
*osd_op
);
1406 hobject_t
earliest_backfill() const;
1407 bool check_src_targ(const hobject_t
& soid
, const hobject_t
& toid
) const;
1409 uint64_t temp_seq
; ///< last id for naming temp objects
1410 /// generate a new temp object name
1411 hobject_t
generate_temp_object(const hobject_t
& target
);
1412 /// generate a new temp object name (for recovery)
1413 hobject_t
get_temp_recovery_object(const hobject_t
& target
,
1414 eversion_t version
) override
;
1415 int get_recovery_op_priority() const {
1417 pool
.info
.opts
.get(pool_opts_t::RECOVERY_OP_PRIORITY
, &pri
);
1418 return pri
> 0 ? pri
: cct
->_conf
->osd_recovery_op_priority
;
1420 void log_missing(unsigned missing
,
1421 const boost::optional
<hobject_t
> &head
,
1426 bool allow_incomplete_clones
);
1427 unsigned process_clones_to(const boost::optional
<hobject_t
> &head
,
1428 const boost::optional
<SnapSet
> &snapset
,
1432 bool allow_incomplete_clones
,
1433 boost::optional
<snapid_t
> target
,
1434 vector
<snapid_t
>::reverse_iterator
*curclone
,
1435 inconsistent_snapset_wrapper
&snap_error
);
1445 const pg_pool_t
*pool
,
1446 ObjectStore::Transaction
*t
) override
{
1447 coll_t target
= coll_t(child
);
1448 PG::_create(*t
, child
, split_bits
);
1449 t
->split_collection(
1454 PG::_init(*t
, child
, pool
);
1458 struct DoSnapWork
: boost::statechart::event
< DoSnapWork
> {
1459 DoSnapWork() : boost::statechart::event
< DoSnapWork
>() {}
1461 struct KickTrim
: boost::statechart::event
< KickTrim
> {
1462 KickTrim() : boost::statechart::event
< KickTrim
>() {}
1464 struct RepopsComplete
: boost::statechart::event
< RepopsComplete
> {
1465 RepopsComplete() : boost::statechart::event
< RepopsComplete
>() {}
1467 struct ScrubComplete
: boost::statechart::event
< ScrubComplete
> {
1468 ScrubComplete() : boost::statechart::event
< ScrubComplete
>() {}
1470 struct TrimWriteUnblocked
: boost::statechart::event
< TrimWriteUnblocked
> {
1471 TrimWriteUnblocked() : boost::statechart::event
< TrimWriteUnblocked
>() {}
1473 struct Reset
: boost::statechart::event
< Reset
> {
1474 Reset() : boost::statechart::event
< Reset
>() {}
1476 struct SnapTrimReserved
: boost::statechart::event
< SnapTrimReserved
> {
1477 SnapTrimReserved() : boost::statechart::event
< SnapTrimReserved
>() {}
1479 struct SnapTrimTimerReady
: boost::statechart::event
< SnapTrimTimerReady
> {
1480 SnapTrimTimerReady() : boost::statechart::event
< SnapTrimTimerReady
>() {}
1484 struct SnapTrimmer
: public boost::statechart::state_machine
< SnapTrimmer
, NotTrimming
> {
1486 explicit SnapTrimmer(PrimaryLogPG
*pg
) : pg(pg
) {}
1487 void log_enter(const char *state_name
);
1488 void log_exit(const char *state_name
, utime_t duration
);
1490 return pg
->is_clean() && !pg
->scrubber
.active
&& !pg
->snap_trimq
.empty();
1492 } snap_trimmer_machine
;
1494 struct WaitReservation
;
1495 struct Trimming
: boost::statechart::state
< Trimming
, SnapTrimmer
, WaitReservation
>, NamedState
{
1496 typedef boost::mpl::list
<
1497 boost::statechart::custom_reaction
< KickTrim
>,
1498 boost::statechart::transition
< Reset
, NotTrimming
>
1501 set
<hobject_t
> in_flight
;
1502 snapid_t snap_to_trim
;
1504 explicit Trimming(my_context ctx
)
1506 NamedState(context
< SnapTrimmer
>().pg
, "Trimming") {
1507 context
< SnapTrimmer
>().log_enter(state_name
);
1508 assert(context
< SnapTrimmer
>().can_trim());
1509 assert(in_flight
.empty());
1512 context
< SnapTrimmer
>().log_exit(state_name
, enter_time
);
1513 auto *pg
= context
< SnapTrimmer
>().pg
;
1514 pg
->osd
->snap_reserver
.cancel_reservation(pg
->get_pgid());
1515 pg
->state_clear(PG_STATE_SNAPTRIM
);
1516 pg
->publish_stats_to_osd();
1518 boost::statechart::result
react(const KickTrim
&) {
1519 return discard_event();
1523 /* SnapTrimmerStates */
1524 struct WaitTrimTimer
: boost::statechart::state
< WaitTrimTimer
, Trimming
>, NamedState
{
1525 typedef boost::mpl::list
<
1526 boost::statechart::custom_reaction
< SnapTrimTimerReady
>
1528 Context
*wakeup
= nullptr;
1529 explicit WaitTrimTimer(my_context ctx
)
1531 NamedState(context
< SnapTrimmer
>().pg
, "Trimming/WaitTrimTimer") {
1532 context
< SnapTrimmer
>().log_enter(state_name
);
1533 assert(context
<Trimming
>().in_flight
.empty());
1534 struct OnTimer
: Context
{
1537 OnTimer(PrimaryLogPGRef pg
, epoch_t epoch
) : pg(pg
), epoch(epoch
) {}
1538 void finish(int) override
{
1540 if (!pg
->pg_has_reset_since(epoch
))
1541 pg
->snap_trimmer_machine
.process_event(SnapTrimTimerReady());
1545 auto *pg
= context
< SnapTrimmer
>().pg
;
1546 if (pg
->cct
->_conf
->osd_snap_trim_sleep
> 0) {
1547 wakeup
= new OnTimer
{pg
, pg
->get_osdmap()->get_epoch()};
1548 Mutex::Locker
l(pg
->osd
->snap_sleep_lock
);
1549 pg
->osd
->snap_sleep_timer
.add_event_after(
1550 pg
->cct
->_conf
->osd_snap_trim_sleep
, wakeup
);
1552 post_event(SnapTrimTimerReady());
1556 context
< SnapTrimmer
>().log_exit(state_name
, enter_time
);
1557 auto *pg
= context
< SnapTrimmer
>().pg
;
1559 Mutex::Locker
l(pg
->osd
->snap_sleep_lock
);
1560 pg
->osd
->snap_sleep_timer
.cancel_event(wakeup
);
1564 boost::statechart::result
react(const SnapTrimTimerReady
&) {
1566 if (!context
< SnapTrimmer
>().can_trim()) {
1567 post_event(KickTrim());
1568 return transit
< NotTrimming
>();
1570 return transit
< AwaitAsyncWork
>();
1575 struct WaitRWLock
: boost::statechart::state
< WaitRWLock
, Trimming
>, NamedState
{
1576 typedef boost::mpl::list
<
1577 boost::statechart::custom_reaction
< TrimWriteUnblocked
>
1579 explicit WaitRWLock(my_context ctx
)
1581 NamedState(context
< SnapTrimmer
>().pg
, "Trimming/WaitRWLock") {
1582 context
< SnapTrimmer
>().log_enter(state_name
);
1583 assert(context
<Trimming
>().in_flight
.empty());
1586 context
< SnapTrimmer
>().log_exit(state_name
, enter_time
);
1588 boost::statechart::result
react(const TrimWriteUnblocked
&) {
1589 if (!context
< SnapTrimmer
>().can_trim()) {
1590 post_event(KickTrim());
1591 return transit
< NotTrimming
>();
1593 return transit
< AwaitAsyncWork
>();
1598 struct WaitRepops
: boost::statechart::state
< WaitRepops
, Trimming
>, NamedState
{
1599 typedef boost::mpl::list
<
1600 boost::statechart::custom_reaction
< RepopsComplete
>
1602 explicit WaitRepops(my_context ctx
)
1604 NamedState(context
< SnapTrimmer
>().pg
, "Trimming/WaitRepops") {
1605 context
< SnapTrimmer
>().log_enter(state_name
);
1606 assert(!context
<Trimming
>().in_flight
.empty());
1609 context
< SnapTrimmer
>().log_exit(state_name
, enter_time
);
1611 boost::statechart::result
react(const RepopsComplete
&) {
1612 if (!context
< SnapTrimmer
>().can_trim()) {
1613 post_event(KickTrim());
1614 return transit
< NotTrimming
>();
1616 return transit
< WaitTrimTimer
>();
1621 struct AwaitAsyncWork
: boost::statechart::state
< AwaitAsyncWork
, Trimming
>, NamedState
{
1622 typedef boost::mpl::list
<
1623 boost::statechart::custom_reaction
< DoSnapWork
>
1625 explicit AwaitAsyncWork(my_context ctx
);
1627 context
< SnapTrimmer
>().log_exit(state_name
, enter_time
);
1629 boost::statechart::result
react(const DoSnapWork
&);
1632 struct WaitReservation
: boost::statechart::state
< WaitReservation
, Trimming
>, NamedState
{
1633 /* WaitReservation is a sub-state of trimming simply so that exiting Trimming
1634 * always cancels the reservation */
1635 typedef boost::mpl::list
<
1636 boost::statechart::custom_reaction
< SnapTrimReserved
>
1638 struct ReservationCB
: public Context
{
1641 ReservationCB(PrimaryLogPG
*pg
) : pg(pg
), canceled(false) {}
1642 void finish(int) override
{
1645 pg
->snap_trimmer_machine
.process_event(SnapTrimReserved());
1649 assert(pg
->is_locked());
1654 ReservationCB
*pending
= nullptr;
1656 explicit WaitReservation(my_context ctx
)
1658 NamedState(context
< SnapTrimmer
>().pg
, "Trimming/WaitReservation") {
1659 context
< SnapTrimmer
>().log_enter(state_name
);
1660 assert(context
<Trimming
>().in_flight
.empty());
1661 auto *pg
= context
< SnapTrimmer
>().pg
;
1662 pending
= new ReservationCB(pg
);
1663 pg
->osd
->snap_reserver
.request_reservation(
1667 pg
->state_set(PG_STATE_SNAPTRIM_WAIT
);
1668 pg
->publish_stats_to_osd();
1670 boost::statechart::result
react(const SnapTrimReserved
&);
1672 context
< SnapTrimmer
>().log_exit(state_name
, enter_time
);
1676 auto *pg
= context
< SnapTrimmer
>().pg
;
1677 pg
->state_clear(PG_STATE_SNAPTRIM_WAIT
);
1678 pg
->state_clear(PG_STATE_SNAPTRIM_ERROR
);
1679 pg
->publish_stats_to_osd();
1683 struct WaitScrub
: boost::statechart::state
< WaitScrub
, SnapTrimmer
>, NamedState
{
1684 typedef boost::mpl::list
<
1685 boost::statechart::custom_reaction
< ScrubComplete
>,
1686 boost::statechart::custom_reaction
< KickTrim
>,
1687 boost::statechart::transition
< Reset
, NotTrimming
>
1689 explicit WaitScrub(my_context ctx
)
1691 NamedState(context
< SnapTrimmer
>().pg
, "Trimming/WaitScrub") {
1692 context
< SnapTrimmer
>().log_enter(state_name
);
1695 context
< SnapTrimmer
>().log_exit(state_name
, enter_time
);
1697 boost::statechart::result
react(const ScrubComplete
&) {
1698 post_event(KickTrim());
1699 return transit
< NotTrimming
>();
1701 boost::statechart::result
react(const KickTrim
&) {
1702 return discard_event();
1706 struct NotTrimming
: boost::statechart::state
< NotTrimming
, SnapTrimmer
>, NamedState
{
1707 typedef boost::mpl::list
<
1708 boost::statechart::custom_reaction
< KickTrim
>,
1709 boost::statechart::transition
< Reset
, NotTrimming
>
1711 explicit NotTrimming(my_context ctx
);
1713 boost::statechart::result
react(const KickTrim
&);
1716 int _verify_no_head_clones(const hobject_t
& soid
,
1718 // return true if we're creating a local object, false for a
1719 // whiteout or no change.
1720 void maybe_create_new_object(OpContext
*ctx
, bool ignore_transaction
=false);
1721 int _delete_oid(OpContext
*ctx
, bool no_whiteout
, bool try_no_whiteout
);
1722 int _rollback_to(OpContext
*ctx
, ceph_osd_op
& op
);
1724 bool is_missing_object(const hobject_t
& oid
) const;
1725 bool is_unreadable_object(const hobject_t
&oid
) const {
1726 return is_missing_object(oid
) ||
1727 !missing_loc
.readable_with_acting(oid
, actingset
);
1729 void maybe_kick_recovery(const hobject_t
&soid
);
1730 void wait_for_unreadable_object(const hobject_t
& oid
, OpRequestRef op
);
1731 void wait_for_all_missing(OpRequestRef op
);
1733 bool is_degraded_or_backfilling_object(const hobject_t
& oid
);
1734 void wait_for_degraded_object(const hobject_t
& oid
, OpRequestRef op
);
1736 void block_write_on_full_cache(
1737 const hobject_t
& oid
, OpRequestRef op
);
1738 void block_for_clean(
1739 const hobject_t
& oid
, OpRequestRef op
);
1740 void block_write_on_snap_rollback(
1741 const hobject_t
& oid
, ObjectContextRef obc
, OpRequestRef op
);
1742 void block_write_on_degraded_snap(const hobject_t
& oid
, OpRequestRef op
);
1744 bool maybe_await_blocked_snapset(const hobject_t
&soid
, OpRequestRef op
);
1745 void wait_for_blocked_object(const hobject_t
& soid
, OpRequestRef op
);
1746 void kick_object_context_blocked(ObjectContextRef obc
);
1748 void maybe_force_recovery();
1750 void mark_all_unfound_lost(
1754 eversion_t
pick_newest_available(const hobject_t
& oid
);
1756 void do_update_log_missing(
1759 void do_update_log_missing_reply(
1762 void on_role_change() override
;
1763 void on_pool_change() override
;
1764 void _on_new_interval() override
;
1765 void on_change(ObjectStore::Transaction
*t
) override
;
1766 void on_activate() override
;
1767 void on_flushed() override
;
1768 void on_removal(ObjectStore::Transaction
*t
) override
;
1769 void on_shutdown() override
;
1770 bool check_failsafe_full(ostream
&ss
) override
;
1771 bool check_osdmap_full(const set
<pg_shard_t
> &missing_on
) override
;
1772 int rep_repair_primary_object(const hobject_t
& soid
, OpRequestRef op
);
1774 // attr cache handling
1775 void setattr_maybe_cache(
1776 ObjectContextRef obc
,
1781 void setattrs_maybe_cache(
1782 ObjectContextRef obc
,
1785 map
<string
, bufferlist
> &attrs
);
1786 void rmattr_maybe_cache(
1787 ObjectContextRef obc
,
1791 int getattr_maybe_cache(
1792 ObjectContextRef obc
,
1795 int getattrs_maybe_cache(
1796 ObjectContextRef obc
,
1797 map
<string
, bufferlist
> *out
,
1798 bool user_only
= false);
1801 inline ostream
& operator<<(ostream
& out
, const PrimaryLogPG::RepGather
& repop
)
1803 out
<< "repgather(" << &repop
1805 << " rep_tid=" << repop
.rep_tid
1806 << " committed?=" << repop
.all_committed
1807 << " applied?=" << repop
.all_applied
1813 inline ostream
& operator<<(ostream
& out
,
1814 const PrimaryLogPG::ProxyWriteOpRef
& pwop
)
1816 out
<< "proxywrite(" << &pwop
1817 << " " << pwop
->user_version
1818 << " pwop_tid=" << pwop
->objecter_tid
;
1820 out
<< " op=" << *(pwop
->ctx
->op
->get_req());
1825 void intrusive_ptr_add_ref(PrimaryLogPG::RepGather
*repop
);
1826 void intrusive_ptr_release(PrimaryLogPG::RepGather
*repop
);