1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 * Ceph - scalable distributed file system
5 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
6 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
8 * Author: Loic Dachary <loic@dachary.org>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_REPLICATEDPG_H
18 #define CEPH_REPLICATEDPG_H
20 #include <boost/tuple/tuple.hpp>
21 #include "include/ceph_assert.h"
22 #include "DynamicPerfStats.h"
26 #include "TierAgentState.h"
27 #include "messages/MOSDOpReply.h"
28 #include "common/Checksummer.h"
29 #include "common/sharedptr_registry.hpp"
30 #include "common/shared_cache.hpp"
31 #include "ReplicatedBackend.h"
32 #include "PGTransaction.h"
33 #include "cls/cas/cls_cas_ops.h"
35 class CopyFromCallback
;
36 class PromoteCallback
;
37 struct RefCountCallback
;
42 struct TierAgentState
;
45 void intrusive_ptr_add_ref(PrimaryLogPG
*pg
);
46 void intrusive_ptr_release(PrimaryLogPG
*pg
);
47 uint64_t get_with_id(PrimaryLogPG
*pg
);
48 void put_with_id(PrimaryLogPG
*pg
, uint64_t id
);
51 typedef TrackedIntPtr
<PrimaryLogPG
> PrimaryLogPGRef
;
53 typedef boost::intrusive_ptr
<PrimaryLogPG
> PrimaryLogPGRef
;
56 struct inconsistent_snapset_wrapper
;
58 class PrimaryLogPG
: public PG
, public PGBackend::Listener
{
61 friend class PrimaryLogScrub
;
64 MEMPOOL_CLASS_HELPERS();
67 * state associated with a copy operation
73 * CopyResults stores the object metadata of interest to a copy initiator.
76 ceph::real_time mtime
; ///< the copy source's mtime
77 uint64_t object_size
; ///< the copied object's size
78 bool started_temp_obj
; ///< true if the callback needs to delete temp object
79 hobject_t temp_oid
; ///< temp object (if any)
82 * Function to fill in transaction; if non-empty the callback
83 * must execute it before any other accesses to the object
84 * (in order to complete the copy).
86 std::function
<void(PGTransaction
*)> fill_in_final_tx
;
88 version_t user_version
; ///< The copy source's user version
89 bool should_requeue
; ///< op should be requeued on cancel
90 std::vector
<snapid_t
> snaps
; ///< src's snaps (if clone)
91 snapid_t snap_seq
; ///< src's snap_seq (if head)
92 librados::snap_set_t snapset
; ///< src snapset (if head)
95 uint32_t flags
; // object_copy_data_t::FLAG_*
96 uint32_t source_data_digest
, source_omap_digest
;
97 uint32_t data_digest
, omap_digest
;
98 mempool::osd_pglog::vector
<std::pair
<osd_reqid_t
, version_t
> > reqids
; // [(reqid, user_version)]
99 mempool::osd_pglog::map
<uint32_t, int> reqid_return_codes
; // std::map reqids by index to error code
100 std::map
<std::string
, ceph::buffer::list
, std::less
<>> attrs
; // xattrs
101 uint64_t truncate_seq
;
102 uint64_t truncate_size
;
103 bool is_data_digest() {
104 return flags
& object_copy_data_t::FLAG_DATA_DIGEST
;
106 bool is_omap_digest() {
107 return flags
& object_copy_data_t::FLAG_OMAP_DIGEST
;
110 : object_size(0), started_temp_obj(false),
112 should_requeue(false), mirror_snapset(false),
115 source_data_digest(-1), source_omap_digest(-1),
116 data_digest(-1), omap_digest(-1),
117 truncate_seq(0), truncate_size(0)
122 typedef std::shared_ptr
<CopyOp
> CopyOpRef
;
126 ObjectContextRef obc
;
128 object_locator_t oloc
;
134 ceph_tid_t objecter_tid
;
135 ceph_tid_t objecter_tid2
;
137 object_copy_cursor_t cursor
;
138 std::map
<std::string
,ceph::buffer::list
,std::less
<>> attrs
;
139 ceph::buffer::list data
;
140 ceph::buffer::list omap_header
;
141 ceph::buffer::list omap_data
;
144 object_copy_cursor_t temp_cursor
;
147 * For CopyOp the process is:
148 * step1: read the data(attr/omap/data) from the source object
149 * step2: handle those data(w/ those data create a new object)
150 * src_obj_fadvise_flags used in step1;
151 * dest_obj_fadvise_flags used in step2
153 unsigned src_obj_fadvise_flags
;
154 unsigned dest_obj_fadvise_flags
;
156 std::map
<uint64_t, CopyOpRef
> chunk_cops
;
159 uint64_t start_offset
= 0;
160 uint64_t last_offset
= 0;
161 std::vector
<OSDOp
> chunk_ops
;
163 CopyOp(CopyCallback
*cb_
, ObjectContextRef _obc
, hobject_t s
,
168 unsigned src_obj_fadvise_flags
,
169 unsigned dest_obj_fadvise_flags
)
170 : cb(cb_
), obc(_obc
), src(s
), oloc(l
), flags(f
),
175 src_obj_fadvise_flags(src_obj_fadvise_flags
),
176 dest_obj_fadvise_flags(dest_obj_fadvise_flags
),
180 results
.user_version
= v
;
181 results
.mirror_snapset
= mirror_snapset
;
186 * The CopyCallback class defines an interface for completions to the
187 * copy_start code. Users of the copy infrastructure must implement
188 * one and give an instance of the class to start_copy.
190 * The implementer is responsible for making sure that the CopyCallback
191 * can associate itself with the correct copy operation.
193 typedef boost::tuple
<int, CopyResults
*> CopyCallbackResults
;
195 friend class CopyFromCallback
;
196 friend struct CopyFromFinisher
;
197 friend class PromoteCallback
;
198 friend struct PromoteFinisher
;
199 friend struct C_gather
;
204 ceph_tid_t objecter_tid
;
205 std::vector
<OSDOp
> &ops
;
206 version_t user_version
;
208 bool canceled
; ///< true if canceled
210 ProxyReadOp(OpRequestRef _op
, hobject_t oid
, std::vector
<OSDOp
>& _ops
)
211 : op(_op
), soid(oid
),
212 objecter_tid(0), ops(_ops
),
213 user_version(0), data_offset(0),
216 typedef std::shared_ptr
<ProxyReadOp
> ProxyReadOpRef
;
218 struct ProxyWriteOp
{
222 ceph_tid_t objecter_tid
;
223 std::vector
<OSDOp
> &ops
;
224 version_t user_version
;
230 ProxyWriteOp(OpRequestRef _op
, hobject_t oid
, std::vector
<OSDOp
>& _ops
, osd_reqid_t _reqid
)
231 : ctx(NULL
), op(_op
), soid(oid
),
232 objecter_tid(0), ops(_ops
),
233 user_version(0), sent_reply(false),
237 typedef std::shared_ptr
<ProxyWriteOp
> ProxyWriteOpRef
;
240 ObjectContextRef obc
; ///< obc we are flushing
241 OpRequestRef op
; ///< initiating op
242 std::list
<OpRequestRef
> dup_ops
; ///< bandwagon jumpers
243 version_t flushed_version
; ///< user version we are flushing
244 ceph_tid_t objecter_tid
; ///< copy-from request tid
245 int rval
; ///< copy-from result
246 bool blocking
; ///< whether we are blocking updates
247 bool removal
; ///< we are removing the backend object
248 std::optional
<std::function
<void()>> on_flush
; ///< callback, may be null
249 // for chunked object
250 std::map
<uint64_t, int> io_results
;
251 std::map
<uint64_t, ceph_tid_t
> io_tids
;
255 : flushed_version(0), objecter_tid(0), rval(0),
256 blocking(false), removal(false), chunks(0) {}
257 ~FlushOp() { ceph_assert(!on_flush
); }
259 typedef std::shared_ptr
<FlushOp
> FlushOpRef
;
262 OpContext
*ctx
= nullptr;
263 ObjectContextRef obc
;
265 std::vector
<ceph_tid_t
> objecter_tids
;
268 CLSGatherOp(OpContext
*ctx_
, ObjectContextRef obc_
, OpRequestRef op_
)
269 : ctx(ctx_
), obc(obc_
), op(op_
) {}
274 friend struct RefCountCallback
;
276 RefCountCallback
*cb
= nullptr;
277 ceph_tid_t objecter_tid
= 0;
279 std::map
<uint64_t, int> results
;
280 std::map
<uint64_t, ceph_tid_t
> tids
;
281 std::map
<hobject_t
, std::pair
<uint64_t, uint64_t>> chunks
;
282 uint64_t num_chunks
= 0;
283 object_manifest_t new_manifest
;
284 ObjectContextRef obc
;
287 ManifestOp(ObjectContextRef obc
, RefCountCallback
* cb
)
288 : cb(cb
), obc(obc
) {}
289 ManifestOp() = delete;
291 typedef std::shared_ptr
<ManifestOp
> ManifestOpRef
;
292 std::map
<hobject_t
, ManifestOpRef
> manifest_ops
;
294 boost::scoped_ptr
<PGBackend
> pgbackend
;
295 PGBackend
*get_pgbackend() override
{
296 return pgbackend
.get();
299 const PGBackend
*get_pgbackend() const override
{
300 return pgbackend
.get();
304 DoutPrefixProvider
*get_dpp() override
{
308 void on_local_recover(
309 const hobject_t
&oid
,
310 const ObjectRecoveryInfo
&recovery_info
,
311 ObjectContextRef obc
,
313 ObjectStore::Transaction
*t
315 void on_peer_recover(
317 const hobject_t
&oid
,
318 const ObjectRecoveryInfo
&recovery_info
320 recovery_state
.on_peer_recover(peer
, oid
, recovery_info
.version
);
322 void begin_peer_recover(
324 const hobject_t oid
) override
{
325 recovery_state
.begin_peer_recover(peer
, oid
);
327 void on_global_recover(
328 const hobject_t
&oid
,
329 const object_stat_sum_t
&stat_diff
,
330 bool is_delete
) override
;
332 const std::set
<pg_shard_t
> &from
,
333 const hobject_t
&soid
,
334 const eversion_t
&version
) override
;
335 void cancel_pull(const hobject_t
&soid
) override
;
337 const hobject_t
&soid
,
338 const object_stat_sum_t
&delta_stats
) override
;
340 bool primary_error(const hobject_t
& soid
, eversion_t v
);
342 void remove_missing_object(const hobject_t
&oid
,
344 Context
*on_complete
) override
;
346 template<class T
> class BlessedGenContext
;
347 template<class T
> class UnlockedBlessedGenContext
;
348 class BlessedContext
;
349 Context
*bless_context(Context
*c
) override
;
351 GenContext
<ThreadPool::TPHandle
&> *bless_gencontext(
352 GenContext
<ThreadPool::TPHandle
&> *c
) override
;
353 GenContext
<ThreadPool::TPHandle
&> *bless_unlocked_gencontext(
354 GenContext
<ThreadPool::TPHandle
&> *c
) override
;
356 void send_message(int to_osd
, Message
*m
) override
{
357 osd
->send_message_osd_cluster(to_osd
, m
, get_osdmap_epoch());
359 void queue_transaction(ObjectStore::Transaction
&& t
,
360 OpRequestRef op
) override
{
361 osd
->store
->queue_transaction(ch
, std::move(t
), op
);
363 void queue_transactions(std::vector
<ObjectStore::Transaction
>& tls
,
364 OpRequestRef op
) override
{
365 osd
->store
->queue_transactions(ch
, tls
, op
, NULL
);
367 epoch_t
get_interval_start_epoch() const override
{
368 return info
.history
.same_interval_since
;
370 epoch_t
get_last_peering_reset_epoch() const override
{
371 return get_last_peering_reset();
373 const std::set
<pg_shard_t
> &get_acting_recovery_backfill_shards() const override
{
374 return get_acting_recovery_backfill();
376 const std::set
<pg_shard_t
> &get_acting_shards() const override
{
377 return recovery_state
.get_actingset();
379 const std::set
<pg_shard_t
> &get_backfill_shards() const override
{
380 return get_backfill_targets();
383 std::ostream
& gen_dbg_prefix(std::ostream
& out
) const override
{
384 return gen_prefix(out
);
387 const HobjToShardSetMapping
& get_missing_loc_shards() const override
389 return recovery_state
.get_missing_loc().get_missing_locs();
391 const std::map
<pg_shard_t
, pg_missing_t
> &get_shard_missing() const override
{
392 return recovery_state
.get_peer_missing();
394 using PGBackend::Listener::get_shard_missing
;
395 const std::map
<pg_shard_t
, pg_info_t
> &get_shard_info() const override
{
396 return recovery_state
.get_peer_info();
398 using PGBackend::Listener::get_shard_info
;
399 const pg_missing_tracker_t
&get_local_missing() const override
{
400 return recovery_state
.get_pg_log().get_missing();
402 const PGLog
&get_log() const override
{
403 return recovery_state
.get_pg_log();
405 void add_local_next_event(const pg_log_entry_t
& e
) override
{
406 recovery_state
.add_local_next_event(e
);
408 bool pgb_is_primary() const override
{
411 const OSDMapRef
& pgb_get_osdmap() const override final
{
414 epoch_t
pgb_get_osdmap_epoch() const override final
{
415 return get_osdmap_epoch();
417 const pg_info_t
&get_info() const override
{
420 const pg_pool_t
&get_pool() const override
{
424 ObjectContextRef
get_obc(
425 const hobject_t
&hoid
,
426 const std::map
<std::string
, ceph::buffer::list
, std::less
<>> &attrs
) override
{
427 return get_object_context(hoid
, true, &attrs
);
430 bool try_lock_for_read(
431 const hobject_t
&hoid
,
432 ObcLockManager
&manager
) override
{
433 if (is_missing_object(hoid
))
435 auto obc
= get_object_context(hoid
, false, nullptr);
438 return manager
.try_get_read_lock(hoid
, obc
);
441 void release_locks(ObcLockManager
&manager
) override
{
442 release_object_locks(manager
);
445 void inc_osd_stat_repaired() override
{
446 osd
->inc_osd_stat_repaired();
448 bool pg_is_remote_backfilling() override
{
449 return is_remote_backfilling();
451 void pg_add_local_num_bytes(int64_t num_bytes
) override
{
452 add_local_num_bytes(num_bytes
);
454 void pg_sub_local_num_bytes(int64_t num_bytes
) override
{
455 sub_local_num_bytes(num_bytes
);
457 void pg_add_num_bytes(int64_t num_bytes
) override
{
458 add_num_bytes(num_bytes
);
460 void pg_sub_num_bytes(int64_t num_bytes
) override
{
461 sub_num_bytes(num_bytes
);
464 void pgb_set_object_snap_mapping(
465 const hobject_t
&soid
,
466 const std::set
<snapid_t
> &snaps
,
467 ObjectStore::Transaction
*t
) override
{
468 return update_object_snap_mapping(t
, soid
, snaps
);
470 void pgb_clear_object_snap_mapping(
471 const hobject_t
&soid
,
472 ObjectStore::Transaction
*t
) override
{
473 return clear_object_snap_mapping(t
, soid
);
477 std::vector
<pg_log_entry_t
>&& logv
,
478 const std::optional
<pg_hit_set_history_t
> &hset_history
,
479 const eversion_t
&trim_to
,
480 const eversion_t
&roll_forward_to
,
481 const eversion_t
&min_last_complete_ondisk
,
482 bool transaction_applied
,
483 ObjectStore::Transaction
&t
,
484 bool async
= false) override
{
486 ceph_assert(trim_to
<= recovery_state
.get_last_update_ondisk());
489 recovery_state
.update_hset(*hset_history
);
491 if (transaction_applied
) {
492 update_snap_map(logv
, t
);
494 auto last
= logv
.rbegin();
495 if (is_primary() && last
!= logv
.rend()) {
496 projected_log
.skip_can_rollback_to_to_head();
497 projected_log
.trim(cct
, last
->version
, nullptr, nullptr, nullptr);
499 if (!is_primary() && !is_ec_pg()) {
500 replica_clear_repop_obc(logv
, t
);
502 recovery_state
.append_log(
503 std::move(logv
), trim_to
, roll_forward_to
, min_last_complete_ondisk
,
504 t
, transaction_applied
, async
);
507 void replica_clear_repop_obc(
508 const std::vector
<pg_log_entry_t
> &logv
,
509 ObjectStore::Transaction
&t
);
511 void op_applied(const eversion_t
&applied_version
) override
;
515 const hobject_t
&hoid
) override
;
517 bool pg_is_undersized() const override
{
518 return is_undersized();
521 bool pg_is_repair() const override
{
525 void update_peer_last_complete_ondisk(
527 eversion_t lcod
) override
{
528 recovery_state
.update_peer_last_complete_ondisk(fromosd
, lcod
);
531 void update_last_complete_ondisk(
532 eversion_t lcod
) override
{
533 recovery_state
.update_last_complete_ondisk(lcod
);
537 const pg_stat_t
&stat
) override
{
538 recovery_state
.update_stats(
539 [&stat
](auto &history
, auto &stats
) {
545 void schedule_recovery_work(
546 GenContext
<ThreadPool::TPHandle
&> *c
,
547 uint64_t cost
) override
;
549 pg_shard_t
whoami_shard() const override
{
552 spg_t
primary_spg_t() const override
{
553 return spg_t(info
.pgid
.pgid
, get_primary().shard
);
555 pg_shard_t
primary_shard() const override
{
556 return get_primary();
558 uint64_t min_peer_features() const override
{
559 return recovery_state
.get_min_peer_features();
561 uint64_t min_upacting_features() const override
{
562 return recovery_state
.get_min_upacting_features();
564 void send_message_osd_cluster(
565 int peer
, Message
*m
, epoch_t from_epoch
) override
{
566 osd
->send_message_osd_cluster(peer
, m
, from_epoch
);
568 void send_message_osd_cluster(
569 std::vector
<std::pair
<int, Message
*>>& messages
, epoch_t from_epoch
) override
{
570 osd
->send_message_osd_cluster(messages
, from_epoch
);
572 void send_message_osd_cluster(
573 MessageRef m
, Connection
*con
) override
{
574 osd
->send_message_osd_cluster(std::move(m
), con
);
576 void send_message_osd_cluster(
577 Message
*m
, const ConnectionRef
& con
) override
{
578 osd
->send_message_osd_cluster(m
, con
);
580 ConnectionRef
get_con_osd_cluster(int peer
, epoch_t from_epoch
) override
;
581 entity_name_t
get_cluster_msgr_name() override
{
582 return osd
->get_cluster_msgr_name();
585 PerfCounters
*get_logger() override
;
587 ceph_tid_t
get_tid() override
{ return osd
->get_tid(); }
589 OstreamTemp
clog_error() override
{ return osd
->clog
->error(); }
590 OstreamTemp
clog_warn() override
{ return osd
->clog
->warn(); }
593 * a scrub-map arrived from a replica
595 void do_replica_scrub_map(OpRequestRef op
);
597 struct watch_disconnect_t
{
600 bool send_disconnect
;
601 watch_disconnect_t(uint64_t c
, entity_name_t n
, bool sd
)
602 : cookie(c
), name(n
), send_disconnect(sd
) {}
604 void complete_disconnect_watches(
605 ObjectContextRef obc
,
606 const std::list
<watch_disconnect_t
> &to_disconnect
);
609 virtual ~OpFinisher() {
612 virtual int execute() = 0;
616 * Capture all object state associated with an in-progress read or write.
621 std::vector
<OSDOp
> *ops
;
623 const ObjectState
*obs
; // Old objectstate
624 const SnapSet
*snapset
; // Old snapset
626 ObjectState new_obs
; // resulting ObjectState
627 SnapSet new_snapset
; // resulting SnapSet (in case of a write)
628 //pg_stat_t new_stats; // resulting Stats
629 object_stat_sum_t delta_stats
;
631 bool modify
; // (force) modification (even if op_t is empty)
632 bool user_modify
; // user-visible modification
633 bool undirty
; // user explicitly un-dirtying this object
634 bool cache_operation
; ///< true if this is a cache eviction
635 bool ignore_cache
; ///< true if IGNORE_CACHE flag is std::set
636 bool ignore_log_op_stats
; // don't log op stats
637 bool update_log_only
; ///< this is a write that returned an error - just record in pg log for dup detection
638 ObjectCleanRegions clean_regions
;
641 std::list
<std::pair
<watch_info_t
,bool> > watch_connects
; ///< new watch + will_ping flag
642 std::list
<watch_disconnect_t
> watch_disconnects
; ///< old watch + send_discon
643 std::list
<notify_info_t
> notifies
;
645 std::optional
<uint64_t> watch_cookie
;
647 ceph::buffer::list reply_bl
;
648 explicit NotifyAck(uint64_t notify_id
) : notify_id(notify_id
) {}
649 NotifyAck(uint64_t notify_id
, uint64_t cookie
, ceph::buffer::list
& rbl
)
650 : watch_cookie(cookie
), notify_id(notify_id
) {
651 reply_bl
= std::move(rbl
);
654 std::list
<NotifyAck
> notify_acks
;
656 uint64_t bytes_written
, bytes_read
;
659 SnapContext snapc
; // writer snap context
660 eversion_t at_version
; // pg's current version pointer
661 version_t user_at_version
; // pg's current user version pointer
663 /// index of the current subop - only valid inside of do_osd_ops()
664 int current_osd_subop_num
;
665 /// total number of subops processed in this context for cls_cxx_subop_version()
666 int processed_subop_count
= 0;
668 PGTransactionUPtr op_t
;
669 std::vector
<pg_log_entry_t
> log
;
670 std::optional
<pg_hit_set_history_t
> updated_hset_history
;
672 interval_set
<uint64_t> modified_ranges
;
673 ObjectContextRef obc
;
674 ObjectContextRef clone_obc
; // if we created a clone
675 ObjectContextRef head_obc
; // if we also update snapset (see trim_object)
677 // FIXME: we may want to kill this msgr hint off at some point!
678 std::optional
<int> data_off
= std::nullopt
;
684 int num_read
; ///< count read ops
685 int num_write
; ///< count update ops
687 mempool::osd_pglog::vector
<std::pair
<osd_reqid_t
, version_t
> > extra_reqids
;
688 mempool::osd_pglog::map
<uint32_t, int> extra_reqid_return_codes
;
690 hobject_t new_temp_oid
, discard_temp_oid
; ///< temp objects we should start/stop tracking
692 std::list
<std::function
<void()>> on_applied
;
693 std::list
<std::function
<void()>> on_committed
;
694 std::list
<std::function
<void()>> on_finish
;
695 std::list
<std::function
<void()>> on_success
;
696 template <typename F
>
697 void register_on_finish(F
&&f
) {
698 on_finish
.emplace_back(std::forward
<F
>(f
));
700 template <typename F
>
701 void register_on_success(F
&&f
) {
702 on_success
.emplace_back(std::forward
<F
>(f
));
704 template <typename F
>
705 void register_on_applied(F
&&f
) {
706 on_applied
.emplace_back(std::forward
<F
>(f
));
708 template <typename F
>
709 void register_on_commit(F
&&f
) {
710 on_committed
.emplace_back(std::forward
<F
>(f
));
713 bool sent_reply
= false;
715 // pending async reads <off, len, op_flags> -> <outbl, outr>
716 std::list
<std::pair
<boost::tuple
<uint64_t, uint64_t, unsigned>,
717 std::pair
<ceph::buffer::list
*, Context
*> > > pending_async_reads
;
719 friend struct OnReadComplete
;
720 void start_async_reads(PrimaryLogPG
*pg
);
721 void finish_read(PrimaryLogPG
*pg
);
722 bool async_reads_complete() {
723 return inflightreads
== 0;
726 RWState::State lock_type
;
727 ObcLockManager lock_manager
;
729 std::map
<int, std::unique_ptr
<OpFinisher
>> op_finishers
;
731 OpContext(const OpContext
& other
);
732 const OpContext
& operator=(const OpContext
& other
);
734 OpContext(OpRequestRef _op
, osd_reqid_t _reqid
, std::vector
<OSDOp
>* _ops
,
735 ObjectContextRef
& obc
,
737 op(_op
), reqid(_reqid
), ops(_ops
),
740 new_obs(obs
->oi
, obs
->exists
),
741 modify(false), user_modify(false), undirty(false), cache_operation(false),
742 ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
743 bytes_written(0), bytes_read(0), user_at_version(0),
744 current_osd_subop_num(0),
746 reply(NULL
), pg(_pg
),
751 lock_type(RWState::RWNONE
) {
753 new_snapset
= obc
->ssc
->snapset
;
754 snapset
= &obc
->ssc
->snapset
;
757 OpContext(OpRequestRef _op
, osd_reqid_t _reqid
,
758 std::vector
<OSDOp
>* _ops
, PrimaryLogPG
*_pg
) :
759 op(_op
), reqid(_reqid
), ops(_ops
), obs(NULL
), snapset(0),
760 modify(false), user_modify(false), undirty(false), cache_operation(false),
761 ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
762 bytes_written(0), bytes_read(0), user_at_version(0),
763 current_osd_subop_num(0),
764 reply(NULL
), pg(_pg
),
768 lock_type(RWState::RWNONE
) {}
769 void reset_obs(ObjectContextRef obc
) {
770 new_obs
= ObjectState(obc
->obs
.oi
, obc
->obs
.exists
);
772 new_snapset
= obc
->ssc
->snapset
;
773 snapset
= &obc
->ssc
->snapset
;
780 for (std::list
<std::pair
<boost::tuple
<uint64_t, uint64_t, unsigned>,
781 std::pair
<ceph::buffer::list
*, Context
*> > >::iterator i
=
782 pending_async_reads
.begin();
783 i
!= pending_async_reads
.end();
784 pending_async_reads
.erase(i
++)) {
785 delete i
->second
.second
;
788 uint64_t get_features() {
789 if (op
&& op
->get_req()) {
790 return op
->get_req()->get_connection()->get_features();
795 using OpContextUPtr
= std::unique_ptr
<OpContext
>;
796 friend struct OpContext
;
799 * State on the PG primary associated with the replicated mutation
805 xlist
<RepGather
*>::item queue_item
;
818 eversion_t pg_local_last_complete
;
820 ObcLockManager lock_manager
;
822 std::list
<std::function
<void()>> on_committed
;
823 std::list
<std::function
<void()>> on_success
;
824 std::list
<std::function
<void()>> on_finish
;
827 OpContext
*c
, ceph_tid_t rt
,
829 hoid(c
->obc
->obs
.oi
.soid
),
835 all_committed(false),
836 pg_local_last_complete(lc
),
837 lock_manager(std::move(c
->lock_manager
)),
838 on_committed(std::move(c
->on_committed
)),
839 on_success(std::move(c
->on_success
)),
840 on_finish(std::move(c
->on_finish
)) {}
843 ObcLockManager
&&manager
,
845 std::optional
<std::function
<void(void)> > &&on_complete
,
855 all_committed(false),
856 pg_local_last_complete(lc
),
857 lock_manager(std::move(manager
)) {
859 on_success
.push_back(std::move(*on_complete
));
868 ceph_assert(nref
> 0);
871 //generic_dout(0) << "deleting " << this << dendl;
880 * Grabs locks for OpContext, should be cleaned up in close_op_ctx
882 * @param ctx [in,out] ctx to get locks for
883 * @return true on success, false if we are queued
885 bool get_rw_locks(bool write_ordered
, OpContext
*ctx
);
888 * Cleans up OpContext
890 * @param ctx [in] ctx to clean up
892 void close_op_ctx(OpContext
*ctx
);
897 * @param manager [in] manager with locks to release
899 * (moved to .cc due to scrubber access)
901 void release_object_locks(ObcLockManager
&lock_manager
);
905 xlist
<RepGather
*> repop_queue
;
907 friend class C_OSD_RepopCommit
;
908 void repop_all_committed(RepGather
*repop
);
909 void eval_repop(RepGather
*);
910 void issue_repop(RepGather
*repop
, OpContext
*ctx
);
911 RepGather
*new_repop(
914 boost::intrusive_ptr
<RepGather
> new_repop(
917 ObcLockManager
&&manager
,
919 std::optional
<std::function
<void(void)> > &&on_complete
);
920 void remove_repop(RepGather
*repop
);
922 OpContextUPtr
simple_opc_create(ObjectContextRef obc
);
923 void simple_opc_submit(OpContextUPtr ctx
);
926 * Merge entries atomically into all acting_recovery_backfill osds
927 * adjusting missing and recovery state as necessary.
929 * Also used to store error log entries for dup detection.
931 void submit_log_entries(
932 const mempool::osd_pglog::list
<pg_log_entry_t
> &entries
,
933 ObcLockManager
&&manager
,
934 std::optional
<std::function
<void(void)> > &&on_complete
,
935 OpRequestRef op
= OpRequestRef(),
937 struct LogUpdateCtx
{
938 boost::intrusive_ptr
<RepGather
> repop
;
939 std::set
<pg_shard_t
> waiting_on
;
941 void cancel_log_updates();
942 std::map
<ceph_tid_t
, LogUpdateCtx
> log_entry_update_waiting_on
;
946 HitSetRef hit_set
; ///< currently accumulating HitSet
947 utime_t hit_set_start_stamp
; ///< time the current HitSet started recording
950 void hit_set_clear(); ///< discard any HitSet state
951 void hit_set_setup(); ///< initialize HitSet state
952 void hit_set_create(); ///< create a new HitSet
953 void hit_set_persist(); ///< persist hit info
954 bool hit_set_apply_log(); ///< apply log entries to update in-memory HitSet
955 void hit_set_trim(OpContextUPtr
&ctx
, unsigned max
); ///< discard old HitSets
956 void hit_set_in_memory_trim(uint32_t max_in_memory
); ///< discard old in memory HitSets
957 void hit_set_remove_all();
959 hobject_t
get_hit_set_current_object(utime_t stamp
);
960 hobject_t
get_hit_set_archive_object(utime_t start
,
965 boost::scoped_ptr
<TierAgentState
> agent_state
;
967 void agent_setup(); ///< initialize agent state
968 bool agent_work(int max
) override
///< entry point to do some agent work
970 return agent_work(max
, max
);
972 bool agent_work(int max
, int agent_flush_quota
) override
;
973 bool agent_maybe_flush(ObjectContextRef
& obc
); ///< maybe flush
974 bool agent_maybe_evict(ObjectContextRef
& obc
, bool after_flush
); ///< maybe evict
976 void agent_load_hit_sets(); ///< load HitSets, if needed
978 /// estimate object atime and temperature
980 /// @param oid [in] object name
981 /// @param temperature [out] relative temperature (# consider both access time and frequency)
982 void agent_estimate_temp(const hobject_t
& oid
, int *temperature
);
985 void agent_stop() override
;
986 void agent_delay() override
;
988 /// clear agent state
989 void agent_clear() override
;
991 /// choose (new) agent mode(s), returns true if op is requeued
992 bool agent_choose_mode(bool restart
= false, OpRequestRef op
= OpRequestRef());
993 void agent_choose_mode_restart() override
;
995 /// true if we can send an ondisk/commit for v
996 bool already_complete(eversion_t v
);
998 // projected object info
999 SharedLRU
<hobject_t
, ObjectContext
> object_contexts
;
1000 // std::map from oid.snapdir() to SnapSetContext *
1001 std::map
<hobject_t
, SnapSetContext
*> snapset_contexts
;
1002 ceph::mutex snapset_contexts_lock
=
1003 ceph::make_mutex("PrimaryLogPG::snapset_contexts_lock");
1005 // debug order that client ops are applied
1006 std::map
<hobject_t
, std::map
<client_t
, ceph_tid_t
>> debug_op_order
;
1008 void populate_obc_watchers(ObjectContextRef obc
);
1009 void check_blocklisted_obc_watchers(ObjectContextRef obc
);
1010 void check_blocklisted_watchers() override
;
1011 void get_watchers(std::list
<obj_watch_item_t
> *ls
) override
;
1012 void get_obc_watchers(ObjectContextRef obc
, std::list
<obj_watch_item_t
> &pg_watchers
);
1014 void handle_watch_timeout(WatchRef watch
);
1017 ObjectContextRef
create_object_context(const object_info_t
& oi
, SnapSetContext
*ssc
);
1018 ObjectContextRef
get_object_context(
1019 const hobject_t
& soid
,
1021 const std::map
<std::string
, ceph::buffer::list
, std::less
<>> *attrs
= 0
1024 void context_registry_on_change();
1025 void object_context_destructor_callback(ObjectContext
*obc
);
1026 class C_PG_ObjectContext
;
1028 int find_object_context(const hobject_t
& oid
,
1029 ObjectContextRef
*pobc
,
1031 bool map_snapid_to_clone
=false,
1032 hobject_t
*missing_oid
=NULL
);
1034 void add_object_context_to_pg_stat(ObjectContextRef obc
, pg_stat_t
*stat
);
1036 void get_src_oloc(const object_t
& oid
, const object_locator_t
& oloc
, object_locator_t
& src_oloc
);
1038 SnapSetContext
*get_snapset_context(
1039 const hobject_t
& oid
,
1041 const std::map
<std::string
, ceph::buffer::list
, std::less
<>> *attrs
= 0,
1042 bool oid_existed
= true //indicate this oid whether exsited in backend
1044 void register_snapset_context(SnapSetContext
*ssc
) {
1045 std::lock_guard
l(snapset_contexts_lock
);
1046 _register_snapset_context(ssc
);
1048 void _register_snapset_context(SnapSetContext
*ssc
) {
1049 ceph_assert(ceph_mutex_is_locked(snapset_contexts_lock
));
1050 if (!ssc
->registered
) {
1051 ceph_assert(snapset_contexts
.count(ssc
->oid
) == 0);
1052 ssc
->registered
= true;
1053 snapset_contexts
[ssc
->oid
] = ssc
;
1056 void put_snapset_context(SnapSetContext
*ssc
);
1058 std::map
<hobject_t
, ObjectContextRef
> recovering
;
1063 * peer_info[backfill_target].last_backfill == info.last_backfill on the peer.
1065 * objects prior to peer_info[backfill_target].last_backfill
1067 * - are included in the peer stats
1069 * objects \in (last_backfill, last_backfill_started]
1070 * - are on the peer or are in backfills_in_flight
1071 * - are not included in pg stats (yet)
1072 * - have their stats in pending_backfill_updates on the primary
1074 std::set
<hobject_t
> backfills_in_flight
;
1075 std::map
<hobject_t
, pg_stat_t
> pending_backfill_updates
;
1077 void dump_recovery_info(ceph::Formatter
*f
) const override
{
1078 f
->open_array_section("waiting_on_backfill");
1079 for (std::set
<pg_shard_t
>::const_iterator p
= waiting_on_backfill
.begin();
1080 p
!= waiting_on_backfill
.end(); ++p
)
1081 f
->dump_stream("osd") << *p
;
1083 f
->dump_stream("last_backfill_started") << last_backfill_started
;
1085 f
->open_object_section("backfill_info");
1086 backfill_info
.dump(f
);
1090 f
->open_array_section("peer_backfill_info");
1091 for (std::map
<pg_shard_t
, BackfillInterval
>::const_iterator pbi
=
1092 peer_backfill_info
.begin();
1093 pbi
!= peer_backfill_info
.end(); ++pbi
) {
1094 f
->dump_stream("osd") << pbi
->first
;
1095 f
->open_object_section("BackfillInterval");
1096 pbi
->second
.dump(f
);
1102 f
->open_array_section("backfills_in_flight");
1103 for (std::set
<hobject_t
>::const_iterator i
= backfills_in_flight
.begin();
1104 i
!= backfills_in_flight
.end();
1106 f
->dump_stream("object") << *i
;
1111 f
->open_array_section("recovering");
1112 for (std::map
<hobject_t
, ObjectContextRef
>::const_iterator i
= recovering
.begin();
1113 i
!= recovering
.end();
1115 f
->dump_stream("object") << i
->first
;
1120 f
->open_object_section("pg_backend");
1121 pgbackend
->dump_recovery_info(f
);
1126 /// last backfill operation started
1127 hobject_t last_backfill_started
;
1130 int prep_object_replica_pushes(const hobject_t
& soid
, eversion_t v
,
1131 PGBackend::RecoveryHandle
*h
,
1132 bool *work_started
);
1133 int prep_object_replica_deletes(const hobject_t
& soid
, eversion_t v
,
1134 PGBackend::RecoveryHandle
*h
,
1135 bool *work_started
);
1137 void finish_degraded_object(const hobject_t oid
);
1139 // Cancels/resets pulls from peer
1140 void check_recovery_sources(const OSDMapRef
& map
) override
;
1142 int recover_missing(
1143 const hobject_t
& oid
,
1146 PGBackend::RecoveryHandle
*h
);
1153 ObjectContextRef clone_obc
,
1154 const hobject_t
& head
, const hobject_t
& coid
,
1155 object_info_t
*poi
);
1156 void execute_ctx(OpContext
*ctx
);
1157 void finish_ctx(OpContext
*ctx
, int log_op_type
, int result
=0);
1158 void reply_ctx(OpContext
*ctx
, int err
);
1159 void make_writeable(OpContext
*ctx
);
1160 void log_op_stats(const OpRequest
& op
, uint64_t inb
, uint64_t outb
);
1162 void write_update_size_and_usage(object_stat_sum_t
& stats
, object_info_t
& oi
,
1163 interval_set
<uint64_t>& modified
, uint64_t offset
,
1164 uint64_t length
, bool write_full
=false);
1165 inline void truncate_update_size_and_usage(
1166 object_stat_sum_t
& delta_stats
,
1168 uint64_t truncate_size
);
1170 enum class cache_result_t
{
1176 REPLIED_WITH_EAGAIN
,
1179 cache_result_t
maybe_handle_cache_detail(OpRequestRef op
,
1181 ObjectContextRef obc
, int r
,
1182 hobject_t missing_oid
,
1185 ObjectContextRef
*promote_obc
);
1186 cache_result_t
maybe_handle_manifest_detail(OpRequestRef op
,
1188 ObjectContextRef obc
);
1189 bool maybe_handle_manifest(OpRequestRef op
,
1191 ObjectContextRef obc
) {
1192 return cache_result_t::NOOP
!= maybe_handle_manifest_detail(
1199 * This helper function is called from do_op if the ObjectContext lookup fails.
1200 * @returns true if the caching code is handling the Op, false otherwise.
1202 bool maybe_handle_cache(OpRequestRef op
,
1204 ObjectContextRef obc
, int r
,
1205 const hobject_t
& missing_oid
,
1207 bool in_hit_set
= false) {
1208 return cache_result_t::NOOP
!= maybe_handle_cache_detail(
1220 * This helper function checks if a promotion is needed.
1222 bool maybe_promote(ObjectContextRef obc
,
1223 const hobject_t
& missing_oid
,
1224 const object_locator_t
& oloc
,
1227 OpRequestRef promote_op
,
1228 ObjectContextRef
*promote_obc
= nullptr);
1230 * This helper function tells the client to redirect their request elsewhere.
1232 void do_cache_redirect(OpRequestRef op
);
1234 * This function attempts to start a promote. Either it succeeds,
1235 * or places op on a wait std::list. If op is null, failure means that
1236 * this is a noop. If a future user wants to be able to distinguish
1237 * these cases, a return value should be added.
1239 void promote_object(
1240 ObjectContextRef obc
, ///< [optional] obc
1241 const hobject_t
& missing_object
, ///< oid (if !obc)
1242 const object_locator_t
& oloc
, ///< locator for obc|oid
1243 OpRequestRef op
, ///< [optional] client op
1244 ObjectContextRef
*promote_obc
= nullptr ///< [optional] new obc for object
1247 int prepare_transaction(OpContext
*ctx
);
1248 std::list
<std::pair
<OpRequestRef
, OpContext
*> > in_progress_async_reads
;
1249 void complete_read_ctx(int result
, OpContext
*ctx
);
1251 // pg on-disk content
1252 void check_local() override
;
1254 void _clear_recovery_state() override
;
1256 bool start_recovery_ops(
1258 ThreadPool::TPHandle
&handle
, uint64_t *started
) override
;
1260 uint64_t recover_primary(uint64_t max
, ThreadPool::TPHandle
&handle
);
1261 uint64_t recover_replicas(uint64_t max
, ThreadPool::TPHandle
&handle
,
1262 bool *recovery_started
);
1263 hobject_t
earliest_peer_backfill() const;
1264 bool all_peer_done() const;
1266 * @param work_started will be std::set to true if recover_backfill got anywhere
1267 * @returns the number of operations started
1269 uint64_t recover_backfill(uint64_t max
, ThreadPool::TPHandle
&handle
,
1270 bool *work_started
);
1273 * scan a (hash) range of objects in the current pg
1275 * @min return at least this many items, unless we are done
1276 * @max return no more than this many items
1277 * @bi.begin first item should be >= this value
1278 * @bi [out] resulting std::map of objects to eversion_t's
1281 int min
, int max
, BackfillInterval
*bi
,
1282 ThreadPool::TPHandle
&handle
1285 /// Update a hash range to reflect changes since the last scan
1287 BackfillInterval
*bi
, ///< [in,out] interval to update
1288 ThreadPool::TPHandle
&handle
///< [in] tp handle
1291 int prep_backfill_object_push(
1292 hobject_t oid
, eversion_t v
, ObjectContextRef obc
,
1293 std::vector
<pg_shard_t
> peers
,
1294 PGBackend::RecoveryHandle
*h
);
1295 void send_remove_op(const hobject_t
& oid
, eversion_t v
, pg_shard_t peer
);
1298 class C_OSD_AppliedRecoveredObject
;
1299 class C_OSD_CommittedPushedObject
;
1300 class C_OSD_AppliedRecoveredObjectReplica
;
1302 void _applied_recovered_object(ObjectContextRef obc
);
1303 void _applied_recovered_object_replica();
1304 void _committed_pushed_object(epoch_t epoch
, eversion_t lc
);
1305 void recover_got(hobject_t oid
, eversion_t v
);
1308 std::map
<hobject_t
, CopyOpRef
> copy_ops
;
1310 int do_copy_get(OpContext
*ctx
, ceph::buffer::list::const_iterator
& bp
, OSDOp
& op
,
1311 ObjectContextRef
& obc
);
1312 int finish_copy_get();
1314 void fill_in_copy_get_noent(OpRequestRef
& op
, hobject_t oid
,
1318 * To copy an object, call start_copy.
1320 * @param cb: The CopyCallback to be activated when the copy is complete
1321 * @param obc: The ObjectContext we are copying into
1322 * @param src: The source object
1323 * @param oloc: the source object locator
1324 * @param version: the version of the source object to copy (0 for any)
1326 void start_copy(CopyCallback
*cb
, ObjectContextRef obc
, hobject_t src
,
1327 object_locator_t oloc
, version_t version
, unsigned flags
,
1328 bool mirror_snapset
, unsigned src_obj_fadvise_flags
,
1329 unsigned dest_obj_fadvise_flags
);
1330 void process_copy_chunk(hobject_t oid
, ceph_tid_t tid
, int r
);
1331 void _write_copy_chunk(CopyOpRef cop
, PGTransaction
*t
);
1332 uint64_t get_copy_chunk_size() const {
1333 uint64_t size
= cct
->_conf
->osd_copyfrom_max_chunk
;
1334 if (pool
.info
.required_alignment()) {
1335 uint64_t alignment
= pool
.info
.required_alignment();
1336 if (size
% alignment
) {
1337 size
+= alignment
- (size
% alignment
);
1342 void _copy_some(ObjectContextRef obc
, CopyOpRef cop
);
1343 void finish_copyfrom(CopyFromCallback
*cb
);
1344 void finish_promote(int r
, CopyResults
*results
, ObjectContextRef obc
);
1345 void cancel_copy(CopyOpRef cop
, bool requeue
, std::vector
<ceph_tid_t
> *tids
);
1346 void cancel_copy_ops(bool requeue
, std::vector
<ceph_tid_t
> *tids
);
1348 friend struct C_Copyfrom
;
1351 std::map
<hobject_t
, FlushOpRef
> flush_ops
;
1353 /// start_flush takes ownership of on_flush iff ret == -EINPROGRESS
1355 OpRequestRef op
, ObjectContextRef obc
,
1356 bool blocking
, hobject_t
*pmissing
,
1357 std::optional
<std::function
<void()>> &&on_flush
,
1358 bool force_dedup
= false);
1359 void finish_flush(hobject_t oid
, ceph_tid_t tid
, int r
);
1360 int try_flush_mark_clean(FlushOpRef fop
);
1361 void cancel_flush(FlushOpRef fop
, bool requeue
, std::vector
<ceph_tid_t
> *tids
);
1362 void cancel_flush_ops(bool requeue
, std::vector
<ceph_tid_t
> *tids
);
1364 /// @return false if clone is has been evicted
1365 bool is_present_clone(hobject_t coid
);
1367 friend struct C_Flush
;
1370 std::map
<hobject_t
, CLSGatherOp
> cls_gather_ops
;
1371 void cancel_cls_gather(std::map
<hobject_t
,CLSGatherOp
>::iterator iter
, bool requeue
, std::vector
<ceph_tid_t
> *tids
);
1372 void cancel_cls_gather_ops(bool requeue
, std::vector
<ceph_tid_t
> *tids
);
1375 bool _range_available_for_scrub(
1376 const hobject_t
&begin
, const hobject_t
&end
) override
;
1378 void _split_into(pg_t child_pgid
, PG
*child
,
1379 unsigned split_bits
) override
;
1380 void apply_and_flush_repops(bool requeue
);
1382 int do_xattr_cmp_u64(int op
, uint64_t v1
, ceph::buffer::list
& xattr
);
1383 int do_xattr_cmp_str(int op
, std::string
& v1s
, ceph::buffer::list
& xattr
);
1386 int do_checksum(OpContext
*ctx
, OSDOp
& osd_op
, ceph::buffer::list::const_iterator
*bl_it
);
1387 int finish_checksum(OSDOp
& osd_op
, Checksummer::CSumType csum_type
,
1388 ceph::buffer::list::const_iterator
*init_value_bl_it
,
1389 const ceph::buffer::list
&read_bl
);
1391 friend struct C_ChecksumRead
;
1393 int do_extent_cmp(OpContext
*ctx
, OSDOp
& osd_op
);
1394 int finish_extent_cmp(OSDOp
& osd_op
, const ceph::buffer::list
&read_bl
);
1396 friend struct C_ExtentCmpRead
;
1398 int do_read(OpContext
*ctx
, OSDOp
& osd_op
);
1399 int do_sparse_read(OpContext
*ctx
, OSDOp
& osd_op
);
1400 int do_writesame(OpContext
*ctx
, OSDOp
& osd_op
);
1402 bool pgls_filter(const PGLSFilter
& filter
, const hobject_t
& sobj
);
1404 std::pair
<int, std::unique_ptr
<const PGLSFilter
>> get_pgls_filter(
1405 ceph::buffer::list::const_iterator
& iter
);
1407 std::map
<hobject_t
, std::list
<OpRequestRef
>> in_progress_proxy_ops
;
1408 void kick_proxy_ops_blocked(hobject_t
& soid
);
1409 void cancel_proxy_ops(bool requeue
, std::vector
<ceph_tid_t
> *tids
);
1412 std::map
<ceph_tid_t
, ProxyReadOpRef
> proxyread_ops
;
1414 void do_proxy_read(OpRequestRef op
, ObjectContextRef obc
= NULL
);
1415 void finish_proxy_read(hobject_t oid
, ceph_tid_t tid
, int r
);
1416 void cancel_proxy_read(ProxyReadOpRef prdop
, std::vector
<ceph_tid_t
> *tids
);
1418 friend struct C_ProxyRead
;
1421 std::map
<ceph_tid_t
, ProxyWriteOpRef
> proxywrite_ops
;
1423 void do_proxy_write(OpRequestRef op
, ObjectContextRef obc
= NULL
);
1424 void finish_proxy_write(hobject_t oid
, ceph_tid_t tid
, int r
);
1425 void cancel_proxy_write(ProxyWriteOpRef pwop
, std::vector
<ceph_tid_t
> *tids
);
1427 friend struct C_ProxyWrite_Commit
;
1430 enum class refcount_t
{
1435 void do_proxy_chunked_op(OpRequestRef op
, const hobject_t
& missing_oid
,
1436 ObjectContextRef obc
, bool write_ordered
);
1437 void do_proxy_chunked_read(OpRequestRef op
, ObjectContextRef obc
, int op_index
,
1438 uint64_t chunk_index
, uint64_t req_offset
, uint64_t req_length
,
1439 uint64_t req_total_len
, bool write_ordered
);
1440 bool can_proxy_chunked_read(OpRequestRef op
, ObjectContextRef obc
);
1441 void _copy_some_manifest(ObjectContextRef obc
, CopyOpRef cop
, uint64_t start_offset
);
1442 void process_copy_chunk_manifest(hobject_t oid
, ceph_tid_t tid
, int r
, uint64_t offset
);
1443 void finish_promote_manifest(int r
, CopyResults
*results
, ObjectContextRef obc
);
1444 void cancel_and_requeue_proxy_ops(hobject_t oid
);
1445 void cancel_manifest_ops(bool requeue
, std::vector
<ceph_tid_t
> *tids
);
1446 ceph_tid_t
refcount_manifest(hobject_t src_soid
, hobject_t tgt_soid
, refcount_t type
,
1447 Context
*cb
, std::optional
<bufferlist
> chunk
);
1448 void dec_all_refcount_manifest(const object_info_t
& oi
, OpContext
* ctx
);
1449 void dec_refcount(const hobject_t
& soid
, const object_ref_delta_t
& refs
);
1450 void update_chunk_map_by_dirty(OpContext
* ctx
);
1451 void dec_refcount_by_dirty(OpContext
* ctx
);
1452 ObjectContextRef
get_prev_clone_obc(ObjectContextRef obc
);
1453 bool recover_adjacent_clones(ObjectContextRef obc
, OpRequestRef op
);
1454 void get_adjacent_clones(ObjectContextRef src_obc
,
1455 ObjectContextRef
& _l
, ObjectContextRef
& _g
);
1456 bool inc_refcount_by_set(OpContext
* ctx
, object_manifest_t
& tgt
,
1458 int do_cdc(const object_info_t
& oi
, std::map
<uint64_t, chunk_info_t
>& chunk_map
,
1459 std::map
<uint64_t, bufferlist
>& chunks
);
1460 int start_dedup(OpRequestRef op
, ObjectContextRef obc
);
1461 std::pair
<int, hobject_t
> get_fpoid_from_chunk(const hobject_t soid
, bufferlist
& chunk
);
1462 int finish_set_dedup(hobject_t oid
, int r
, ceph_tid_t tid
, uint64_t offset
);
1463 int finish_set_manifest_refcount(hobject_t oid
, int r
, ceph_tid_t tid
, uint64_t offset
);
1465 friend struct C_ProxyChunkRead
;
1466 friend class PromoteManifestCallback
;
1467 friend struct C_CopyChunk
;
1468 friend struct RefCountCallback
;
1469 friend struct C_SetDedupChunks
;
1470 friend struct C_SetManifestRefCountDone
;
1471 friend struct SetManifestFinisher
;
1474 PrimaryLogPG(OSDService
*o
, OSDMapRef curmap
,
1475 const PGPool
&_pool
,
1476 const std::map
<std::string
,std::string
>& ec_profile
,
1478 ~PrimaryLogPG() override
;
1481 const std::string_view
& prefix
,
1482 const cmdmap_t
& cmdmap
,
1483 const ceph::buffer::list
& idata
,
1484 std::function
<void(int,const std::string
&,ceph::buffer::list
&)> on_finish
) override
;
1486 void clear_cache() override
;
1487 int get_cache_obj_count() override
{
1488 return object_contexts
.get_count();
1490 unsigned get_pg_shard() const {
1491 return info
.pgid
.hash_to_shard(osd
->get_num_shards());
1495 ThreadPool::TPHandle
&handle
) override
;
1496 void do_op(OpRequestRef
& op
);
1497 void record_write_error(OpRequestRef op
, const hobject_t
&soid
,
1498 MOSDOpReply
*orig_reply
, int r
,
1499 OpContext
*ctx_for_op_returns
=nullptr);
1500 void do_pg_op(OpRequestRef op
);
1503 ThreadPool::TPHandle
&handle
);
1504 void do_backfill(OpRequestRef op
);
1505 void do_backfill_remove(OpRequestRef op
);
1507 void handle_backoff(OpRequestRef
& op
);
1509 int trim_object(bool first
, const hobject_t
&coid
, snapid_t snap_to_trim
,
1510 OpContextUPtr
*ctxp
);
1511 void snap_trimmer(epoch_t e
) override
;
1512 void kick_snap_trim() override
;
1513 void snap_trimmer_scrub_complete() override
;
1514 int do_osd_ops(OpContext
*ctx
, std::vector
<OSDOp
>& ops
);
1516 int _get_tmap(OpContext
*ctx
, ceph::buffer::list
*header
, ceph::buffer::list
*vals
);
1517 int do_tmap2omap(OpContext
*ctx
, unsigned flags
);
1518 int do_tmapup(OpContext
*ctx
, ceph::buffer::list::const_iterator
& bp
, OSDOp
& osd_op
);
1519 int do_tmapup_slow(OpContext
*ctx
, ceph::buffer::list::const_iterator
& bp
, OSDOp
& osd_op
, ceph::buffer::list
& bl
);
1521 void do_osd_op_effects(OpContext
*ctx
, const ConnectionRef
& conn
);
1522 int start_cls_gather(OpContext
*ctx
, std::map
<std::string
, bufferlist
> *src_objs
, const std::string
& pool
,
1523 const char *cls
, const char *method
, bufferlist
& inbl
);
1526 int do_scrub_ls(const MOSDOp
*op
, OSDOp
*osd_op
);
1527 bool check_src_targ(const hobject_t
& soid
, const hobject_t
& toid
) const;
1529 uint64_t temp_seq
; ///< last id for naming temp objects
1530 /// generate a new temp object name
1531 hobject_t
generate_temp_object(const hobject_t
& target
);
1532 /// generate a new temp object name (for recovery)
1533 hobject_t
get_temp_recovery_object(const hobject_t
& target
,
1534 eversion_t version
) override
;
1543 const pg_pool_t
*pool
,
1544 ObjectStore::Transaction
&t
) override
{
1545 coll_t target
= coll_t(child
);
1546 create_pg_collection(t
, child
, split_bits
);
1552 init_pg_ondisk(t
, child
, pool
);
1556 struct DoSnapWork
: boost::statechart::event
< DoSnapWork
> {
1557 DoSnapWork() : boost::statechart::event
< DoSnapWork
>() {}
1559 struct KickTrim
: boost::statechart::event
< KickTrim
> {
1560 KickTrim() : boost::statechart::event
< KickTrim
>() {}
1562 struct RepopsComplete
: boost::statechart::event
< RepopsComplete
> {
1563 RepopsComplete() : boost::statechart::event
< RepopsComplete
>() {}
1565 struct ScrubComplete
: boost::statechart::event
< ScrubComplete
> {
1566 ScrubComplete() : boost::statechart::event
< ScrubComplete
>() {}
1568 struct TrimWriteUnblocked
: boost::statechart::event
< TrimWriteUnblocked
> {
1569 TrimWriteUnblocked() : boost::statechart::event
< TrimWriteUnblocked
>() {}
1571 struct Reset
: boost::statechart::event
< Reset
> {
1572 Reset() : boost::statechart::event
< Reset
>() {}
1574 struct SnapTrimReserved
: boost::statechart::event
< SnapTrimReserved
> {
1575 SnapTrimReserved() : boost::statechart::event
< SnapTrimReserved
>() {}
1577 struct SnapTrimTimerReady
: boost::statechart::event
< SnapTrimTimerReady
> {
1578 SnapTrimTimerReady() : boost::statechart::event
< SnapTrimTimerReady
>() {}
1582 struct SnapTrimmer
: public boost::statechart::state_machine
< SnapTrimmer
, NotTrimming
> {
1584 explicit SnapTrimmer(PrimaryLogPG
*pg
) : pg(pg
) {}
1585 void log_enter(const char *state_name
);
1586 void log_exit(const char *state_name
, utime_t duration
);
1591 !pg
->get_osdmap()->test_flag(CEPH_OSDMAP_NOSNAPTRIM
);
1593 } snap_trimmer_machine
;
1595 struct WaitReservation
;
1596 struct Trimming
: boost::statechart::state
< Trimming
, SnapTrimmer
, WaitReservation
>, NamedState
{
1597 typedef boost::mpl::list
<
1598 boost::statechart::custom_reaction
< KickTrim
>,
1599 boost::statechart::transition
< Reset
, NotTrimming
>
1602 std::set
<hobject_t
> in_flight
;
1603 snapid_t snap_to_trim
;
1605 explicit Trimming(my_context ctx
)
1607 NamedState(nullptr, "Trimming") {
1608 context
< SnapTrimmer
>().log_enter(state_name
);
1609 ceph_assert(context
< SnapTrimmer
>().permit_trim());
1610 ceph_assert(in_flight
.empty());
1613 context
< SnapTrimmer
>().log_exit(state_name
, enter_time
);
1614 auto *pg
= context
< SnapTrimmer
>().pg
;
1615 pg
->osd
->snap_reserver
.cancel_reservation(pg
->get_pgid());
1616 pg
->state_clear(PG_STATE_SNAPTRIM
);
1617 pg
->publish_stats_to_osd();
1619 boost::statechart::result
react(const KickTrim
&) {
1620 return discard_event();
1624 /* SnapTrimmerStates */
1625 struct WaitTrimTimer
: boost::statechart::state
< WaitTrimTimer
, Trimming
>, NamedState
{
1626 typedef boost::mpl::list
<
1627 boost::statechart::custom_reaction
< SnapTrimTimerReady
>
1629 Context
*wakeup
= nullptr;
1630 explicit WaitTrimTimer(my_context ctx
)
1632 NamedState(nullptr, "Trimming/WaitTrimTimer") {
1633 context
< SnapTrimmer
>().log_enter(state_name
);
1634 ceph_assert(context
<Trimming
>().in_flight
.empty());
1635 struct OnTimer
: Context
{
1638 OnTimer(PrimaryLogPGRef pg
, epoch_t epoch
) : pg(pg
), epoch(epoch
) {}
1639 void finish(int) override
{
1641 if (!pg
->pg_has_reset_since(epoch
))
1642 pg
->snap_trimmer_machine
.process_event(SnapTrimTimerReady());
1646 auto *pg
= context
< SnapTrimmer
>().pg
;
1647 float osd_snap_trim_sleep
= pg
->osd
->osd
->get_osd_snap_trim_sleep();
1648 if (osd_snap_trim_sleep
> 0) {
1649 std::lock_guard
l(pg
->osd
->sleep_lock
);
1650 wakeup
= pg
->osd
->sleep_timer
.add_event_after(
1651 osd_snap_trim_sleep
,
1652 new OnTimer
{pg
, pg
->get_osdmap_epoch()});
1654 post_event(SnapTrimTimerReady());
1658 context
< SnapTrimmer
>().log_exit(state_name
, enter_time
);
1659 auto *pg
= context
< SnapTrimmer
>().pg
;
1661 std::lock_guard
l(pg
->osd
->sleep_lock
);
1662 pg
->osd
->sleep_timer
.cancel_event(wakeup
);
1666 boost::statechart::result
react(const SnapTrimTimerReady
&) {
1668 if (!context
< SnapTrimmer
>().can_trim()) {
1669 post_event(KickTrim());
1670 return transit
< NotTrimming
>();
1672 return transit
< AwaitAsyncWork
>();
1677 struct WaitRWLock
: boost::statechart::state
< WaitRWLock
, Trimming
>, NamedState
{
1678 typedef boost::mpl::list
<
1679 boost::statechart::custom_reaction
< TrimWriteUnblocked
>
1681 explicit WaitRWLock(my_context ctx
)
1683 NamedState(nullptr, "Trimming/WaitRWLock") {
1684 context
< SnapTrimmer
>().log_enter(state_name
);
1685 ceph_assert(context
<Trimming
>().in_flight
.empty());
1688 context
< SnapTrimmer
>().log_exit(state_name
, enter_time
);
1690 boost::statechart::result
react(const TrimWriteUnblocked
&) {
1691 if (!context
< SnapTrimmer
>().can_trim()) {
1692 post_event(KickTrim());
1693 return transit
< NotTrimming
>();
1695 return transit
< AwaitAsyncWork
>();
1700 struct WaitRepops
: boost::statechart::state
< WaitRepops
, Trimming
>, NamedState
{
1701 typedef boost::mpl::list
<
1702 boost::statechart::custom_reaction
< RepopsComplete
>
1704 explicit WaitRepops(my_context ctx
)
1706 NamedState(nullptr, "Trimming/WaitRepops") {
1707 context
< SnapTrimmer
>().log_enter(state_name
);
1708 ceph_assert(!context
<Trimming
>().in_flight
.empty());
1711 context
< SnapTrimmer
>().log_exit(state_name
, enter_time
);
1713 boost::statechart::result
react(const RepopsComplete
&) {
1714 if (!context
< SnapTrimmer
>().can_trim()) {
1715 post_event(KickTrim());
1716 return transit
< NotTrimming
>();
1718 return transit
< WaitTrimTimer
>();
1723 struct AwaitAsyncWork
: boost::statechart::state
< AwaitAsyncWork
, Trimming
>, NamedState
{
1724 typedef boost::mpl::list
<
1725 boost::statechart::custom_reaction
< DoSnapWork
>
1727 explicit AwaitAsyncWork(my_context ctx
);
1729 context
< SnapTrimmer
>().log_exit(state_name
, enter_time
);
1731 boost::statechart::result
react(const DoSnapWork
&);
1734 struct WaitReservation
: boost::statechart::state
< WaitReservation
, Trimming
>, NamedState
{
1735 /* WaitReservation is a sub-state of trimming simply so that exiting Trimming
1736 * always cancels the reservation */
1737 typedef boost::mpl::list
<
1738 boost::statechart::custom_reaction
< SnapTrimReserved
>
1740 struct ReservationCB
: public Context
{
1743 explicit ReservationCB(PrimaryLogPG
*pg
) : pg(pg
), canceled(false) {}
1744 void finish(int) override
{
1747 pg
->snap_trimmer_machine
.process_event(SnapTrimReserved());
1751 ceph_assert(pg
->is_locked());
1752 ceph_assert(!canceled
);
1756 ReservationCB
*pending
= nullptr;
1758 explicit WaitReservation(my_context ctx
)
1760 NamedState(nullptr, "Trimming/WaitReservation") {
1761 context
< SnapTrimmer
>().log_enter(state_name
);
1762 ceph_assert(context
<Trimming
>().in_flight
.empty());
1763 auto *pg
= context
< SnapTrimmer
>().pg
;
1764 pending
= new ReservationCB(pg
);
1765 pg
->osd
->snap_reserver
.request_reservation(
1769 pg
->state_set(PG_STATE_SNAPTRIM_WAIT
);
1770 pg
->publish_stats_to_osd();
1772 boost::statechart::result
react(const SnapTrimReserved
&);
1774 context
< SnapTrimmer
>().log_exit(state_name
, enter_time
);
1778 auto *pg
= context
< SnapTrimmer
>().pg
;
1779 pg
->state_clear(PG_STATE_SNAPTRIM_WAIT
);
1780 pg
->state_clear(PG_STATE_SNAPTRIM_ERROR
);
1781 pg
->publish_stats_to_osd();
1785 struct WaitScrub
: boost::statechart::state
< WaitScrub
, SnapTrimmer
>, NamedState
{
1786 typedef boost::mpl::list
<
1787 boost::statechart::custom_reaction
< ScrubComplete
>,
1788 boost::statechart::custom_reaction
< KickTrim
>,
1789 boost::statechart::transition
< Reset
, NotTrimming
>
1791 explicit WaitScrub(my_context ctx
)
1793 NamedState(nullptr, "WaitScrub") {
1794 context
< SnapTrimmer
>().log_enter(state_name
);
1797 context
< SnapTrimmer
>().log_exit(state_name
, enter_time
);
1799 boost::statechart::result
react(const ScrubComplete
&) {
1800 post_event(KickTrim());
1801 return transit
< NotTrimming
>();
1803 boost::statechart::result
react(const KickTrim
&) {
1804 return discard_event();
1808 struct NotTrimming
: boost::statechart::state
< NotTrimming
, SnapTrimmer
>, NamedState
{
1809 typedef boost::mpl::list
<
1810 boost::statechart::custom_reaction
< KickTrim
>,
1811 boost::statechart::transition
< Reset
, NotTrimming
>
1813 explicit NotTrimming(my_context ctx
);
1815 boost::statechart::result
react(const KickTrim
&);
1818 int _verify_no_head_clones(const hobject_t
& soid
,
1820 // return true if we're creating a local object, false for a
1821 // whiteout or no change.
1822 void maybe_create_new_object(OpContext
*ctx
, bool ignore_transaction
=false);
1823 int _delete_oid(OpContext
*ctx
, bool no_whiteout
, bool try_no_whiteout
);
1824 int _rollback_to(OpContext
*ctx
, OSDOp
& op
);
1825 void _do_rollback_to(OpContext
*ctx
, ObjectContextRef rollback_to
,
1828 bool is_missing_object(const hobject_t
& oid
) const;
1829 bool is_unreadable_object(const hobject_t
&oid
) const {
1830 return is_missing_object(oid
) ||
1831 !recovery_state
.get_missing_loc().readable_with_acting(
1832 oid
, get_actingset());
1834 void maybe_kick_recovery(const hobject_t
&soid
);
1835 void wait_for_unreadable_object(const hobject_t
& oid
, OpRequestRef op
);
1837 int get_manifest_ref_count(ObjectContextRef obc
, std::string
& fp_oid
, OpRequestRef op
);
1839 bool check_laggy(OpRequestRef
& op
);
1840 bool check_laggy_requeue(OpRequestRef
& op
);
1841 void recheck_readable() override
;
1843 bool is_backfill_target(pg_shard_t osd
) const {
1844 return recovery_state
.is_backfill_target(osd
);
1846 const std::set
<pg_shard_t
> &get_backfill_targets() const {
1847 return recovery_state
.get_backfill_targets();
1849 bool is_async_recovery_target(pg_shard_t peer
) const {
1850 return recovery_state
.is_async_recovery_target(peer
);
1852 const std::set
<pg_shard_t
> &get_async_recovery_targets() const {
1853 return recovery_state
.get_async_recovery_targets();
1855 bool is_degraded_or_backfilling_object(const hobject_t
& oid
);
1856 bool is_degraded_on_async_recovery_target(const hobject_t
& soid
);
1857 void wait_for_degraded_object(const hobject_t
& oid
, OpRequestRef op
);
1859 void block_write_on_full_cache(
1860 const hobject_t
& oid
, OpRequestRef op
);
1861 void block_for_clean(
1862 const hobject_t
& oid
, OpRequestRef op
);
1863 void block_write_on_snap_rollback(
1864 const hobject_t
& oid
, ObjectContextRef obc
, OpRequestRef op
);
1865 void block_write_on_degraded_snap(const hobject_t
& oid
, OpRequestRef op
);
1867 bool maybe_await_blocked_head(const hobject_t
&soid
, OpRequestRef op
);
1868 void wait_for_blocked_object(const hobject_t
& soid
, OpRequestRef op
);
1869 void kick_object_context_blocked(ObjectContextRef obc
);
1870 void requeue_op_blocked_by_object(const hobject_t
&soid
);
1872 void maybe_force_recovery();
1874 void mark_all_unfound_lost(
1876 std::function
<void(int,const std::string
&,ceph::buffer::list
&)> on_finish
);
1877 eversion_t
pick_newest_available(const hobject_t
& oid
);
1879 void do_update_log_missing(
1882 void do_update_log_missing_reply(
1885 void plpg_on_role_change() override
;
1886 void plpg_on_pool_change() override
;
1887 void clear_async_reads();
1888 void on_change(ObjectStore::Transaction
&t
) override
;
1889 void on_activate_complete() override
;
1890 void on_flushed() override
;
1891 void on_removal(ObjectStore::Transaction
&t
) override
;
1892 void on_shutdown() override
;
1893 bool check_failsafe_full() override
;
1894 bool maybe_preempt_replica_scrub(const hobject_t
& oid
) override
;
1895 int rep_repair_primary_object(const hobject_t
& soid
, OpContext
*ctx
);
1897 // attr cache handling
1898 void setattr_maybe_cache(
1899 ObjectContextRef obc
,
1901 const std::string
&key
,
1902 ceph::buffer::list
&val
);
1903 void setattrs_maybe_cache(
1904 ObjectContextRef obc
,
1906 std::map
<std::string
, ceph::buffer::list
, std::less
<>> &attrs
);
1907 void rmattr_maybe_cache(
1908 ObjectContextRef obc
,
1910 const std::string
&key
);
1912 * getattr_maybe_cache
1914 * Populates val (if non-null) with the value of the attr with the specified key.
1915 * Returns -ENOENT if object does not exist, -ENODATA if the object exists,
1916 * but the specified key does not.
1918 int getattr_maybe_cache(
1919 ObjectContextRef obc
,
1920 const std::string
&key
,
1921 ceph::buffer::list
*val
);
1922 int getattrs_maybe_cache(
1923 ObjectContextRef obc
,
1924 std::map
<std::string
, ceph::buffer::list
, std::less
<>> *out
);
1927 void set_dynamic_perf_stats_queries(
1928 const std::list
<OSDPerfMetricQuery
> &queries
) override
;
1929 void get_dynamic_perf_stats(DynamicPerfStats
*stats
) override
;
1932 DynamicPerfStats m_dynamic_perf_stats
;
1935 inline ostream
& operator<<(ostream
& out
, const PrimaryLogPG::RepGather
& repop
)
1937 out
<< "repgather(" << &repop
1939 << " rep_tid=" << repop
.rep_tid
1940 << " committed?=" << repop
.all_committed
1946 inline ostream
& operator<<(ostream
& out
,
1947 const PrimaryLogPG::ProxyWriteOpRef
& pwop
)
1949 out
<< "proxywrite(" << &pwop
1950 << " " << pwop
->user_version
1951 << " pwop_tid=" << pwop
->objecter_tid
;
1953 out
<< " op=" << *(pwop
->ctx
->op
->get_req());
1958 void intrusive_ptr_add_ref(PrimaryLogPG::RepGather
*repop
);
1959 void intrusive_ptr_release(PrimaryLogPG::RepGather
*repop
);