1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
20 #include "msg/Dispatcher.h"
22 #include "common/Mutex.h"
23 #include "common/RWLock.h"
24 #include "common/Timer.h"
25 #include "common/WorkQueue.h"
26 #include "common/AsyncReserver.h"
27 #include "common/ceph_context.h"
28 #include "common/zipkin_trace.h"
30 #include "mgr/MgrClient.h"
32 #include "os/ObjectStore.h"
35 #include "auth/KeyRing.h"
36 #include "osd/ClassHandler.h"
38 #include "include/CompatSet.h"
40 #include "OpRequest.h"
46 #include "include/memory.h"
49 #include "include/unordered_map.h"
51 #include "common/shared_cache.hpp"
52 #include "common/simple_cache.hpp"
53 #include "common/sharedptr_registry.hpp"
54 #include "common/WeightedPriorityQueue.h"
55 #include "common/PrioritizedQueue.h"
56 #include "messages/MOSDOp.h"
57 #include "include/Spinlock.h"
58 #include "common/EventTrace.h"
60 #define CEPH_OSD_PROTOCOL 10 /* cluster internal */
75 l_osd_op_r_lat_outb_hist
,
76 l_osd_op_r_process_lat
,
77 l_osd_op_r_prepare_lat
,
81 l_osd_op_w_lat_inb_hist
,
82 l_osd_op_w_process_lat
,
83 l_osd_op_w_prepare_lat
,
88 l_osd_op_rw_lat_inb_hist
,
89 l_osd_op_rw_lat_outb_hist
,
90 l_osd_op_rw_process_lat
,
91 l_osd_op_rw_prepare_lat
,
113 l_osd_history_alloc_bytes
,
114 l_osd_history_alloc_num
,
116 l_osd_cached_crc_adjusted
,
128 l_osd_waiting_for_map
,
131 l_osd_map_cache_miss
,
132 l_osd_map_cache_miss_low
,
133 l_osd_map_cache_miss_low_avg
,
134 l_osd_map_bl_cache_hit
,
135 l_osd_map_bl_cache_miss
,
138 l_osd_stat_bytes_used
,
139 l_osd_stat_bytes_avail
,
145 l_osd_tier_flush_fail
,
146 l_osd_tier_try_flush
,
147 l_osd_tier_try_flush_fail
,
153 l_osd_tier_proxy_read
,
154 l_osd_tier_proxy_write
,
161 l_osd_object_ctx_cache_hit
,
162 l_osd_object_ctx_cache_total
,
165 l_osd_tier_flush_lat
,
166 l_osd_tier_promote_lat
,
176 // RecoveryState perf counters
185 rs_backfilling_latency
,
186 rs_waitremotebackfillreserved_latency
,
187 rs_waitlocalbackfillreserved_latency
,
188 rs_notbackfilling_latency
,
189 rs_repnotrecovering_latency
,
190 rs_repwaitrecoveryreserved_latency
,
191 rs_repwaitbackfillreserved_latency
,
192 rs_reprecovering_latency
,
193 rs_activating_latency
,
194 rs_waitlocalrecoveryreserved_latency
,
195 rs_waitremoterecoveryreserved_latency
,
196 rs_recovering_latency
,
197 rs_recovered_latency
,
200 rs_replicaactive_latency
,
204 rs_waitactingchange_latency
,
205 rs_incomplete_latency
,
207 rs_getmissing_latency
,
208 rs_waitupthru_latency
,
209 rs_notrecovering_latency
,
226 class AuthAuthorizeHandlerRegistry
;
228 class TestOpsSocketHook
;
229 struct C_CompleteSplits
;
233 typedef ceph::shared_ptr
<ObjectStore::Sequencer
> SequencerRef
;
236 class DeletingState
{
250 const PGRef old_pg_state
;
251 explicit DeletingState(const pair
<spg_t
, PGRef
> &in
) :
252 lock("DeletingState::lock"), status(QUEUED
), stop_deleting(false),
253 pgid(in
.first
), old_pg_state(in
.second
) {
256 /// transition status to CLEARING_WAITING
257 bool pause_clearing() {
258 Mutex::Locker
l(lock
);
259 assert(status
== CLEARING_DIR
);
265 status
= CLEARING_WAITING
;
267 } ///< @return false if we should cancel deletion
269 /// start or resume the clearing - transition the status to CLEARING_DIR
270 bool start_or_resume_clearing() {
271 Mutex::Locker
l(lock
);
274 status
== DELETED_DIR
||
275 status
== CLEARING_WAITING
);
281 status
= CLEARING_DIR
;
283 } ///< @return false if we should cancel the deletion
285 /// transition status to CLEARING_DIR
286 bool resume_clearing() {
287 Mutex::Locker
l(lock
);
288 assert(status
== CLEARING_WAITING
);
294 status
= CLEARING_DIR
;
296 } ///< @return false if we should cancel deletion
298 /// transition status to deleting
299 bool start_deleting() {
300 Mutex::Locker
l(lock
);
301 assert(status
== CLEARING_DIR
);
307 status
= DELETING_DIR
;
309 } ///< @return false if we should cancel deletion
311 /// signal collection removal queued
312 void finish_deleting() {
313 Mutex::Locker
l(lock
);
314 assert(status
== DELETING_DIR
);
315 status
= DELETED_DIR
;
319 /// try to halt the deletion
320 bool try_stop_deletion() {
321 Mutex::Locker
l(lock
);
322 stop_deleting
= true;
324 * If we are in DELETING_DIR or CLEARING_DIR, there are in progress
325 * operations we have to wait for before continuing on. States
326 * CLEARING_WAITING and QUEUED indicate that the remover will check
327 * stop_deleting before queueing any further operations. CANCELED
328 * indicates that the remover has already halted. DELETED_DIR
329 * indicates that the deletion has been fully queued.
331 while (status
== DELETING_DIR
|| status
== CLEARING_DIR
)
333 return status
!= DELETED_DIR
;
334 } ///< @return true if we don't need to recreate the collection
336 typedef ceph::shared_ptr
<DeletingState
> DeletingStateRef
;
341 epoch_t epoch_queued
;
342 explicit PGScrub(epoch_t e
) : epoch_queued(e
) {}
343 ostream
&operator<<(ostream
&rhs
) {
344 return rhs
<< "PGScrub";
349 epoch_t epoch_queued
;
350 explicit PGSnapTrim(epoch_t e
) : epoch_queued(e
) {}
351 ostream
&operator<<(ostream
&rhs
) {
352 return rhs
<< "PGSnapTrim";
357 epoch_t epoch_queued
;
358 uint64_t reserved_pushes
;
359 PGRecovery(epoch_t e
, uint64_t reserved_pushes
)
360 : epoch_queued(e
), reserved_pushes(reserved_pushes
) {}
361 ostream
&operator<<(ostream
&rhs
) {
362 return rhs
<< "PGRecovery(epoch=" << epoch_queued
363 << ", reserved_pushes: " << reserved_pushes
<< ")";
368 typedef boost::variant
<
379 epoch_t map_epoch
; ///< an epoch we expect the PG to exist in
381 struct RunVis
: public boost::static_visitor
<> {
384 ThreadPool::TPHandle
&handle
;
385 RunVis(OSD
*osd
, PGRef
&pg
, ThreadPool::TPHandle
&handle
)
386 : osd(osd
), pg(pg
), handle(handle
) {}
387 void operator()(const OpRequestRef
&op
);
388 void operator()(const PGSnapTrim
&op
);
389 void operator()(const PGScrub
&op
);
390 void operator()(const PGRecovery
&op
);
393 struct StringifyVis
: public boost::static_visitor
<std::string
> {
394 std::string
operator()(const OpRequestRef
&op
) {
395 return stringify(op
);
397 std::string
operator()(const PGSnapTrim
&op
) {
400 std::string
operator()(const PGScrub
&op
) {
403 std::string
operator()(const PGRecovery
&op
) {
407 friend ostream
& operator<<(ostream
& out
, const PGQueueable
& q
) {
409 return out
<< "PGQueueable(" << boost::apply_visitor(v
, q
.qvariant
)
410 << " prio " << q
.priority
<< " cost " << q
.cost
411 << " e" << q
.map_epoch
<< ")";
415 // cppcheck-suppress noExplicitConstructor
416 PGQueueable(OpRequestRef op
, epoch_t e
)
417 : qvariant(op
), cost(op
->get_req()->get_cost()),
418 priority(op
->get_req()->get_priority()),
419 start_time(op
->get_req()->get_recv_stamp()),
420 owner(op
->get_req()->get_source_inst()),
424 const PGSnapTrim
&op
, int cost
, unsigned priority
, utime_t start_time
,
425 const entity_inst_t
&owner
, epoch_t e
)
426 : qvariant(op
), cost(cost
), priority(priority
), start_time(start_time
),
427 owner(owner
), map_epoch(e
) {}
429 const PGScrub
&op
, int cost
, unsigned priority
, utime_t start_time
,
430 const entity_inst_t
&owner
, epoch_t e
)
431 : qvariant(op
), cost(cost
), priority(priority
), start_time(start_time
),
432 owner(owner
), map_epoch(e
) {}
434 const PGRecovery
&op
, int cost
, unsigned priority
, utime_t start_time
,
435 const entity_inst_t
&owner
, epoch_t e
)
436 : qvariant(op
), cost(cost
), priority(priority
), start_time(start_time
),
437 owner(owner
), map_epoch(e
) {}
438 const boost::optional
<OpRequestRef
> maybe_get_op() const {
439 const OpRequestRef
*op
= boost::get
<OpRequestRef
>(&qvariant
);
440 return op
? OpRequestRef(*op
) : boost::optional
<OpRequestRef
>();
442 uint64_t get_reserved_pushes() const {
443 const PGRecovery
*op
= boost::get
<PGRecovery
>(&qvariant
);
444 return op
? op
->reserved_pushes
: 0;
446 void run(OSD
*osd
, PGRef
&pg
, ThreadPool::TPHandle
&handle
) {
447 RunVis
v(osd
, pg
, handle
);
448 boost::apply_visitor(v
, qvariant
);
450 unsigned get_priority() const { return priority
; }
451 int get_cost() const { return cost
; }
452 utime_t
get_start_time() const { return start_time
; }
453 entity_inst_t
get_owner() const { return owner
; }
454 epoch_t
get_map_epoch() const { return map_epoch
; }
461 SharedPtrRegistry
<spg_t
, ObjectStore::Sequencer
> osr_registry
;
462 ceph::shared_ptr
<ObjectStore::Sequencer
> meta_osr
;
463 SharedPtrRegistry
<spg_t
, DeletingState
> deleting_pgs
;
466 LogClient
&log_client
;
468 PGRecoveryStats
&pg_recovery_stats
;
470 Messenger
*&cluster_messenger
;
471 Messenger
*&client_messenger
;
473 PerfCounters
*&logger
;
474 PerfCounters
*&recoverystate_perf
;
476 ThreadPool::BatchWorkQueue
<PG
> &peering_wq
;
477 GenContextWQ recovery_gen_wq
;
478 ClassHandler
*&class_handler
;
480 void enqueue_back(spg_t pgid
, PGQueueable qi
);
481 void enqueue_front(spg_t pgid
, PGQueueable qi
);
483 void maybe_inject_dispatch_delay() {
484 if (g_conf
->osd_debug_inject_dispatch_delay_probability
> 0) {
486 g_conf
->osd_debug_inject_dispatch_delay_probability
* 10000) {
488 t
.set_from_double(g_conf
->osd_debug_inject_dispatch_delay_duration
);
495 // -- map epoch lower bound --
497 multiset
<epoch_t
> pg_epochs
;
498 map
<spg_t
,epoch_t
> pg_epoch
;
501 void pg_add_epoch(spg_t pgid
, epoch_t epoch
) {
502 Mutex::Locker
l(pg_epoch_lock
);
503 map
<spg_t
,epoch_t
>::iterator t
= pg_epoch
.find(pgid
);
504 assert(t
== pg_epoch
.end());
505 pg_epoch
[pgid
] = epoch
;
506 pg_epochs
.insert(epoch
);
508 void pg_update_epoch(spg_t pgid
, epoch_t epoch
) {
509 Mutex::Locker
l(pg_epoch_lock
);
510 map
<spg_t
,epoch_t
>::iterator t
= pg_epoch
.find(pgid
);
511 assert(t
!= pg_epoch
.end());
512 pg_epochs
.erase(pg_epochs
.find(t
->second
));
514 pg_epochs
.insert(epoch
);
516 void pg_remove_epoch(spg_t pgid
) {
517 Mutex::Locker
l(pg_epoch_lock
);
518 map
<spg_t
,epoch_t
>::iterator t
= pg_epoch
.find(pgid
);
519 if (t
!= pg_epoch
.end()) {
520 pg_epochs
.erase(pg_epochs
.find(t
->second
));
524 epoch_t
get_min_pg_epoch() {
525 Mutex::Locker
l(pg_epoch_lock
);
526 if (pg_epochs
.empty())
529 return *pg_epochs
.begin();
534 Mutex publish_lock
, pre_publish_lock
; // pre-publish orders before publish
535 OSDSuperblock superblock
;
538 OSDSuperblock
get_superblock() {
539 Mutex::Locker
l(publish_lock
);
542 void publish_superblock(const OSDSuperblock
&block
) {
543 Mutex::Locker
l(publish_lock
);
547 int get_nodeid() const { return whoami
; }
549 std::atomic
<epoch_t
> max_oldest_map
;
554 OSDMapRef
get_osdmap() {
555 Mutex::Locker
l(publish_lock
);
558 epoch_t
get_osdmap_epoch() {
559 Mutex::Locker
l(publish_lock
);
560 return osdmap
? osdmap
->get_epoch() : 0;
562 void publish_map(OSDMapRef map
) {
563 Mutex::Locker
l(publish_lock
);
568 * osdmap - current published map
569 * next_osdmap - pre_published map that is about to be published.
571 * We use the next_osdmap to send messages and initiate connections,
572 * but only if the target is the same instance as the one in the map
573 * epoch the current user is working from (i.e., the result is
574 * equivalent to what is in next_osdmap).
576 * This allows the helpers to start ignoring osds that are about to
577 * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
578 * down, without worrying about reopening connections from threads
579 * working from old maps.
582 OSDMapRef next_osdmap
;
583 Cond pre_publish_cond
;
586 void pre_publish_map(OSDMapRef map
) {
587 Mutex::Locker
l(pre_publish_lock
);
588 next_osdmap
= std::move(map
);
592 /// map epochs reserved below
593 map
<epoch_t
, unsigned> map_reservations
;
595 /// gets ref to next_osdmap and registers the epoch as reserved
596 OSDMapRef
get_nextmap_reserved() {
597 Mutex::Locker
l(pre_publish_lock
);
600 epoch_t e
= next_osdmap
->get_epoch();
601 map
<epoch_t
, unsigned>::iterator i
=
602 map_reservations
.insert(make_pair(e
, 0)).first
;
606 /// releases reservation on map
607 void release_map(OSDMapRef osdmap
) {
608 Mutex::Locker
l(pre_publish_lock
);
609 map
<epoch_t
, unsigned>::iterator i
=
610 map_reservations
.find(osdmap
->get_epoch());
611 assert(i
!= map_reservations
.end());
612 assert(i
->second
> 0);
613 if (--(i
->second
) == 0) {
614 map_reservations
.erase(i
);
616 pre_publish_cond
.Signal();
618 /// blocks until there are no reserved maps prior to next_osdmap
619 void await_reserved_maps() {
620 Mutex::Locker
l(pre_publish_lock
);
623 map
<epoch_t
, unsigned>::const_iterator i
= map_reservations
.cbegin();
624 if (i
== map_reservations
.cend() || i
->first
>= next_osdmap
->get_epoch()) {
627 pre_publish_cond
.Wait(pre_publish_lock
);
633 Mutex peer_map_epoch_lock
;
634 map
<int, epoch_t
> peer_map_epoch
;
636 epoch_t
get_peer_epoch(int p
);
637 epoch_t
note_peer_epoch(int p
, epoch_t e
);
638 void forget_peer_epoch(int p
, epoch_t e
);
640 void send_map(class MOSDMap
*m
, Connection
*con
);
641 void send_incremental_map(epoch_t since
, Connection
*con
, OSDMapRef
& osdmap
);
642 MOSDMap
*build_incremental_map_msg(epoch_t from
, epoch_t to
,
643 OSDSuperblock
& superblock
);
644 bool should_share_map(entity_name_t name
, Connection
*con
, epoch_t epoch
,
645 const OSDMapRef
& osdmap
, const epoch_t
*sent_epoch_p
);
646 void share_map(entity_name_t name
, Connection
*con
, epoch_t epoch
,
647 OSDMapRef
& osdmap
, epoch_t
*sent_epoch_p
);
648 void share_map_peer(int peer
, Connection
*con
,
649 OSDMapRef map
= OSDMapRef());
651 ConnectionRef
get_con_osd_cluster(int peer
, epoch_t from_epoch
);
652 pair
<ConnectionRef
,ConnectionRef
> get_con_osd_hb(int peer
, epoch_t from_epoch
); // (back, front)
653 void send_message_osd_cluster(int peer
, Message
*m
, epoch_t from_epoch
);
654 void send_message_osd_cluster(Message
*m
, Connection
*con
) {
655 con
->send_message(m
);
657 void send_message_osd_cluster(Message
*m
, const ConnectionRef
& con
) {
658 con
->send_message(m
);
660 void send_message_osd_client(Message
*m
, Connection
*con
) {
661 con
->send_message(m
);
663 void send_message_osd_client(Message
*m
, const ConnectionRef
& con
) {
664 con
->send_message(m
);
666 entity_name_t
get_cluster_msgr_name() {
667 return cluster_messenger
->get_myname();
671 // -- scrub scheduling --
672 Mutex sched_scrub_lock
;
679 /// pg to be scrubbed
681 /// a time scheduled for scrub. but the scrub could be delayed if system
682 /// load is too high or it fails to fall in the scrub hours
684 /// the hard upper bound of scrub time
686 ScrubJob() : cct(nullptr) {}
687 explicit ScrubJob(CephContext
* cct
, const spg_t
& pg
,
688 const utime_t
& timestamp
,
689 double pool_scrub_min_interval
= 0,
690 double pool_scrub_max_interval
= 0, bool must
= true);
691 /// order the jobs by sched_time
692 bool operator<(const ScrubJob
& rhs
) const;
694 set
<ScrubJob
> sched_scrub_pg
;
696 /// @returns the scrub_reg_stamp used for unregister the scrub job
697 utime_t
reg_pg_scrub(spg_t pgid
, utime_t t
, double pool_scrub_min_interval
,
698 double pool_scrub_max_interval
, bool must
) {
699 ScrubJob
scrub(cct
, pgid
, t
, pool_scrub_min_interval
, pool_scrub_max_interval
,
701 Mutex::Locker
l(sched_scrub_lock
);
702 sched_scrub_pg
.insert(scrub
);
703 return scrub
.sched_time
;
705 void unreg_pg_scrub(spg_t pgid
, utime_t t
) {
706 Mutex::Locker
l(sched_scrub_lock
);
707 size_t removed
= sched_scrub_pg
.erase(ScrubJob(cct
, pgid
, t
));
710 bool first_scrub_stamp(ScrubJob
*out
) {
711 Mutex::Locker
l(sched_scrub_lock
);
712 if (sched_scrub_pg
.empty())
714 set
<ScrubJob
>::iterator iter
= sched_scrub_pg
.begin();
718 bool next_scrub_stamp(const ScrubJob
& next
,
720 Mutex::Locker
l(sched_scrub_lock
);
721 if (sched_scrub_pg
.empty())
723 set
<ScrubJob
>::const_iterator iter
= sched_scrub_pg
.lower_bound(next
);
724 if (iter
== sched_scrub_pg
.cend())
727 if (iter
== sched_scrub_pg
.cend())
733 void dumps_scrub(Formatter
*f
) {
734 assert(f
!= nullptr);
735 Mutex::Locker
l(sched_scrub_lock
);
737 f
->open_array_section("scrubs");
738 for (const auto &i
: sched_scrub_pg
) {
739 f
->open_object_section("scrub");
740 f
->dump_stream("pgid") << i
.pgid
;
741 f
->dump_stream("sched_time") << i
.sched_time
;
742 f
->dump_stream("deadline") << i
.deadline
;
743 f
->dump_bool("forced", i
.sched_time
== i
.deadline
);
749 bool can_inc_scrubs_pending();
750 bool inc_scrubs_pending();
751 void inc_scrubs_active(bool reserved
);
752 void dec_scrubs_pending();
753 void dec_scrubs_active();
755 void reply_op_error(OpRequestRef op
, int err
);
756 void reply_op_error(OpRequestRef op
, int err
, eversion_t v
, version_t uv
);
757 void handle_misdirected_op(PG
*pg
, OpRequestRef op
);
761 // -- agent shared state --
764 map
<uint64_t, set
<PGRef
> > agent_queue
;
765 set
<PGRef
>::iterator agent_queue_pos
;
766 bool agent_valid_iterator
;
768 int flush_mode_high_count
; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
769 set
<hobject_t
> agent_oids
;
771 struct AgentThread
: public Thread
{
773 explicit AgentThread(OSDService
*o
) : osd(o
) {}
774 void *entry() override
{
779 bool agent_stop_flag
;
780 Mutex agent_timer_lock
;
781 SafeTimer agent_timer
;
787 void _enqueue(PG
*pg
, uint64_t priority
) {
788 if (!agent_queue
.empty() &&
789 agent_queue
.rbegin()->first
< priority
)
790 agent_valid_iterator
= false; // inserting higher-priority queue
791 set
<PGRef
>& nq
= agent_queue
[priority
];
797 void _dequeue(PG
*pg
, uint64_t old_priority
) {
798 set
<PGRef
>& oq
= agent_queue
[old_priority
];
799 set
<PGRef
>::iterator p
= oq
.find(pg
);
800 assert(p
!= oq
.end());
801 if (p
== agent_queue_pos
)
805 if (agent_queue
.rbegin()->first
== old_priority
)
806 agent_valid_iterator
= false;
807 agent_queue
.erase(old_priority
);
811 /// enable agent for a pg
812 void agent_enable_pg(PG
*pg
, uint64_t priority
) {
813 Mutex::Locker
l(agent_lock
);
814 _enqueue(pg
, priority
);
817 /// adjust priority for an enagled pg
818 void agent_adjust_pg(PG
*pg
, uint64_t old_priority
, uint64_t new_priority
) {
819 Mutex::Locker
l(agent_lock
);
820 assert(new_priority
!= old_priority
);
821 _enqueue(pg
, new_priority
);
822 _dequeue(pg
, old_priority
);
825 /// disable agent for a pg
826 void agent_disable_pg(PG
*pg
, uint64_t old_priority
) {
827 Mutex::Locker
l(agent_lock
);
828 _dequeue(pg
, old_priority
);
831 /// note start of an async (evict) op
832 void agent_start_evict_op() {
833 Mutex::Locker
l(agent_lock
);
837 /// note finish or cancellation of an async (evict) op
838 void agent_finish_evict_op() {
839 Mutex::Locker
l(agent_lock
);
840 assert(agent_ops
> 0);
845 /// note start of an async (flush) op
846 void agent_start_op(const hobject_t
& oid
) {
847 Mutex::Locker
l(agent_lock
);
849 assert(agent_oids
.count(oid
) == 0);
850 agent_oids
.insert(oid
);
853 /// note finish or cancellation of an async (flush) op
854 void agent_finish_op(const hobject_t
& oid
) {
855 Mutex::Locker
l(agent_lock
);
856 assert(agent_ops
> 0);
858 assert(agent_oids
.count(oid
) == 1);
859 agent_oids
.erase(oid
);
863 /// check if we are operating on an object
864 bool agent_is_active_oid(const hobject_t
& oid
) {
865 Mutex::Locker
l(agent_lock
);
866 return agent_oids
.count(oid
);
869 /// get count of active agent ops
870 int agent_get_num_ops() {
871 Mutex::Locker
l(agent_lock
);
875 void agent_inc_high_count() {
876 Mutex::Locker
l(agent_lock
);
877 flush_mode_high_count
++;
880 void agent_dec_high_count() {
881 Mutex::Locker
l(agent_lock
);
882 flush_mode_high_count
--;
886 /// throttle promotion attempts
887 std::atomic_uint promote_probability_millis
{1000}; ///< probability thousands. one word.
888 PromoteCounter promote_counter
;
889 utime_t last_recalibrate
;
890 unsigned long promote_max_objects
, promote_max_bytes
;
893 bool promote_throttle() {
894 // NOTE: lockless! we rely on the probability being a single word.
895 promote_counter
.attempt();
896 if ((unsigned)rand() % 1000 > promote_probability_millis
)
897 return true; // yes throttle (no promote)
898 if (promote_max_objects
&&
899 promote_counter
.objects
> promote_max_objects
)
900 return true; // yes throttle
901 if (promote_max_bytes
&&
902 promote_counter
.bytes
> promote_max_bytes
)
903 return true; // yes throttle
904 return false; // no throttle (promote)
906 void promote_finish(uint64_t bytes
) {
907 promote_counter
.finish(bytes
);
909 void promote_throttle_recalibrate();
911 // -- Objecter, for tiering reads/writes from/to other OSDs --
913 Finisher objecter_finisher
;
917 SafeTimer watch_timer
;
918 uint64_t next_notif_id
;
919 uint64_t get_next_id(epoch_t cur_epoch
) {
920 Mutex::Locker
l(watch_lock
);
921 return (((uint64_t)cur_epoch
) << 32) | ((uint64_t)(next_notif_id
++));
924 // -- Recovery/Backfill Request Scheduling --
925 Mutex recovery_request_lock
;
926 SafeTimer recovery_request_timer
;
928 // For async recovery sleep
929 bool recovery_needs_sleep
= true;
930 utime_t recovery_schedule_time
= utime_t();
932 Mutex recovery_sleep_lock
;
933 SafeTimer recovery_sleep_timer
;
937 std::atomic_uint last_tid
{0};
938 ceph_tid_t
get_tid() {
939 return (ceph_tid_t
)last_tid
++;
942 // -- backfill_reservation --
943 Finisher reserver_finisher
;
944 AsyncReserver
<spg_t
> local_reserver
;
945 AsyncReserver
<spg_t
> remote_reserver
;
950 map
<pg_t
, vector
<int> > pg_temp_wanted
;
951 map
<pg_t
, vector
<int> > pg_temp_pending
;
952 void _sent_pg_temp();
954 void queue_want_pg_temp(pg_t pgid
, vector
<int>& want
);
955 void remove_want_pg_temp(pg_t pgid
);
956 void requeue_pg_temp();
959 void send_pg_created(pg_t pgid
);
961 void queue_for_peering(PG
*pg
);
963 Mutex snap_sleep_lock
;
964 SafeTimer snap_sleep_timer
;
966 Mutex scrub_sleep_lock
;
967 SafeTimer scrub_sleep_timer
;
969 AsyncReserver
<spg_t
> snap_reserver
;
970 void queue_for_snap_trim(PG
*pg
);
972 void queue_for_scrub(PG
*pg
, bool with_high_priority
) {
973 unsigned scrub_queue_priority
= pg
->scrubber
.priority
;
974 if (with_high_priority
&& scrub_queue_priority
< cct
->_conf
->osd_client_op_priority
) {
975 scrub_queue_priority
= cct
->_conf
->osd_client_op_priority
;
980 PGScrub(pg
->get_osdmap()->get_epoch()),
981 cct
->_conf
->osd_scrub_cost
,
982 scrub_queue_priority
,
985 pg
->get_osdmap()->get_epoch()));
989 // -- pg recovery and associated throttling --
991 list
<pair
<epoch_t
, PGRef
> > awaiting_throttle
;
993 utime_t defer_recovery_until
;
994 uint64_t recovery_ops_active
;
995 uint64_t recovery_ops_reserved
;
996 bool recovery_paused
;
997 #ifdef DEBUG_RECOVERY_OIDS
998 map
<spg_t
, set
<hobject_t
> > recovery_oids
;
1000 bool _recover_now(uint64_t *available_pushes
);
1001 void _maybe_queue_recovery();
1002 void _queue_for_recovery(
1003 pair
<epoch_t
, PGRef
> p
, uint64_t reserved_pushes
) {
1004 assert(recovery_lock
.is_locked_by_me());
1006 p
.second
->info
.pgid
,
1008 PGRecovery(p
.first
, reserved_pushes
),
1009 cct
->_conf
->osd_recovery_cost
,
1010 cct
->_conf
->osd_recovery_priority
,
1016 void start_recovery_op(PG
*pg
, const hobject_t
& soid
);
1017 void finish_recovery_op(PG
*pg
, const hobject_t
& soid
, bool dequeue
);
1018 bool is_recovery_active();
1019 void release_reserved_pushes(uint64_t pushes
) {
1020 Mutex::Locker
l(recovery_lock
);
1021 assert(recovery_ops_reserved
>= pushes
);
1022 recovery_ops_reserved
-= pushes
;
1023 _maybe_queue_recovery();
1025 void defer_recovery(float defer_for
) {
1026 defer_recovery_until
= ceph_clock_now();
1027 defer_recovery_until
+= defer_for
;
1029 void pause_recovery() {
1030 Mutex::Locker
l(recovery_lock
);
1031 recovery_paused
= true;
1033 bool recovery_is_paused() {
1034 Mutex::Locker
l(recovery_lock
);
1035 return recovery_paused
;
1037 void unpause_recovery() {
1038 Mutex::Locker
l(recovery_lock
);
1039 recovery_paused
= false;
1040 _maybe_queue_recovery();
1042 void kick_recovery_queue() {
1043 Mutex::Locker
l(recovery_lock
);
1044 _maybe_queue_recovery();
1046 void clear_queued_recovery(PG
*pg
) {
1047 Mutex::Locker
l(recovery_lock
);
1048 for (list
<pair
<epoch_t
, PGRef
> >::iterator i
= awaiting_throttle
.begin();
1049 i
!= awaiting_throttle
.end();
1051 if (i
->second
.get() == pg
) {
1052 awaiting_throttle
.erase(i
);
1059 // delayed pg activation
1060 void queue_for_recovery(PG
*pg
, bool front
= false) {
1061 Mutex::Locker
l(recovery_lock
);
1063 awaiting_throttle
.push_front(make_pair(pg
->get_osdmap()->get_epoch(), pg
));
1065 awaiting_throttle
.push_back(make_pair(pg
->get_osdmap()->get_epoch(), pg
));
1067 _maybe_queue_recovery();
1069 void queue_recovery_after_sleep(PG
*pg
, epoch_t queued
, uint64_t reserved_pushes
) {
1070 Mutex::Locker
l(recovery_lock
);
1071 _queue_for_recovery(make_pair(queued
, pg
), reserved_pushes
);
1075 // osd map cache (past osd maps)
1076 Mutex map_cache_lock
;
1077 SharedLRU
<epoch_t
, const OSDMap
> map_cache
;
1078 SimpleLRU
<epoch_t
, bufferlist
> map_bl_cache
;
1079 SimpleLRU
<epoch_t
, bufferlist
> map_bl_inc_cache
;
1081 OSDMapRef
try_get_map(epoch_t e
);
1082 OSDMapRef
get_map(epoch_t e
) {
1083 OSDMapRef
ret(try_get_map(e
));
1087 OSDMapRef
add_map(OSDMap
*o
) {
1088 Mutex::Locker
l(map_cache_lock
);
1091 OSDMapRef
_add_map(OSDMap
*o
);
1093 void add_map_bl(epoch_t e
, bufferlist
& bl
) {
1094 Mutex::Locker
l(map_cache_lock
);
1095 return _add_map_bl(e
, bl
);
1097 void pin_map_bl(epoch_t e
, bufferlist
&bl
);
1098 void _add_map_bl(epoch_t e
, bufferlist
& bl
);
1099 bool get_map_bl(epoch_t e
, bufferlist
& bl
) {
1100 Mutex::Locker
l(map_cache_lock
);
1101 return _get_map_bl(e
, bl
);
1103 bool _get_map_bl(epoch_t e
, bufferlist
& bl
);
1105 void add_map_inc_bl(epoch_t e
, bufferlist
& bl
) {
1106 Mutex::Locker
l(map_cache_lock
);
1107 return _add_map_inc_bl(e
, bl
);
1109 void pin_map_inc_bl(epoch_t e
, bufferlist
&bl
);
1110 void _add_map_inc_bl(epoch_t e
, bufferlist
& bl
);
1111 bool get_inc_map_bl(epoch_t e
, bufferlist
& bl
);
1113 void clear_map_bl_cache_pins(epoch_t e
);
1115 void need_heartbeat_peer_update();
1117 void pg_stat_queue_enqueue(PG
*pg
);
1118 void pg_stat_queue_dequeue(PG
*pg
);
1122 void start_shutdown();
1123 void shutdown_reserver();
1128 Mutex in_progress_split_lock
;
1129 map
<spg_t
, spg_t
> pending_splits
; // child -> parent
1130 map
<spg_t
, set
<spg_t
> > rev_pending_splits
; // parent -> [children]
1131 set
<spg_t
> in_progress_splits
; // child
1134 void _start_split(spg_t parent
, const set
<spg_t
> &children
);
1135 void start_split(spg_t parent
, const set
<spg_t
> &children
) {
1136 Mutex::Locker
l(in_progress_split_lock
);
1137 return _start_split(parent
, children
);
1139 void mark_split_in_progress(spg_t parent
, const set
<spg_t
> &pgs
);
1140 void complete_split(const set
<spg_t
> &pgs
);
1141 void cancel_pending_splits_for_parent(spg_t parent
);
1142 void _cancel_pending_splits_for_parent(spg_t parent
);
1143 bool splitting(spg_t pgid
);
1144 void expand_pg_num(OSDMapRef old_map
,
1146 void _maybe_split_pgid(OSDMapRef old_map
,
1149 void init_splits_between(spg_t pgid
, OSDMapRef frommap
, OSDMapRef tomap
);
1153 osd_stat_t osd_stat
;
1156 void update_osd_stat(vector
<int>& hb_peers
);
1157 osd_stat_t
get_osd_stat() {
1158 Mutex::Locker
l(stat_lock
);
1160 osd_stat
.up_from
= up_epoch
;
1161 osd_stat
.seq
= ((uint64_t)osd_stat
.up_from
<< 32) + seq
;
1164 uint64_t get_osd_stat_seq() {
1165 Mutex::Locker
l(stat_lock
);
1166 return osd_stat
.seq
;
1169 // -- OSD Full Status --
1171 friend TestOpsSocketHook
;
1172 mutable Mutex full_status_lock
;
1173 enum s_names
{ INVALID
= -1, NONE
, NEARFULL
, BACKFILLFULL
, FULL
, FAILSAFE
} cur_state
; // ascending
1174 const char *get_full_state_name(s_names s
) const {
1176 case NONE
: return "none";
1177 case NEARFULL
: return "nearfull";
1178 case BACKFILLFULL
: return "backfillfull";
1179 case FULL
: return "full";
1180 case FAILSAFE
: return "failsafe";
1181 default: return "???";
1184 s_names
get_full_state(string type
) const {
1187 else if (type
== "failsafe")
1189 else if (type
== "full")
1191 else if (type
== "backfillfull")
1192 return BACKFILLFULL
;
1193 else if (type
== "nearfull")
1198 double cur_ratio
; ///< current utilization
1199 mutable int64_t injectfull
= 0;
1200 s_names injectfull_state
= NONE
;
1201 float get_failsafe_full_ratio();
1202 void check_full_status(const osd_stat_t
&stat
);
1203 bool _check_full(s_names type
, ostream
&ss
) const;
1205 bool check_failsafe_full(ostream
&ss
) const;
1206 bool check_full(ostream
&ss
) const;
1207 bool check_backfill_full(ostream
&ss
) const;
1208 bool check_nearfull(ostream
&ss
) const;
1209 bool is_failsafe_full() const;
1210 bool is_full() const;
1211 bool is_backfillfull() const;
1212 bool is_nearfull() const;
1213 bool need_fullness_update(); ///< osdmap state needs update
1214 void set_injectfull(s_names type
, int64_t count
);
1215 bool check_osdmap_full(const set
<pg_shard_t
> &missing_on
);
1220 mutable Mutex epoch_lock
; // protects access to boot_epoch, up_epoch, bind_epoch
1221 epoch_t boot_epoch
; // _first_ epoch we were marked up (after this process started)
1222 epoch_t up_epoch
; // _most_recent_ epoch we were marked up
1223 epoch_t bind_epoch
; // epoch we last did a bind to new ip:ports
1226 * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params
1227 * can be NULL if you don't care about them.
1229 void retrieve_epochs(epoch_t
*_boot_epoch
, epoch_t
*_up_epoch
,
1230 epoch_t
*_bind_epoch
) const;
1232 * Set the boot, up, and bind epochs. Any NULL params will not be set.
1234 void set_epochs(const epoch_t
*_boot_epoch
, const epoch_t
*_up_epoch
,
1235 const epoch_t
*_bind_epoch
);
1236 epoch_t
get_boot_epoch() const {
1238 retrieve_epochs(&ret
, NULL
, NULL
);
1241 epoch_t
get_up_epoch() const {
1243 retrieve_epochs(NULL
, &ret
, NULL
);
1246 epoch_t
get_bind_epoch() const {
1248 retrieve_epochs(NULL
, NULL
, &ret
);
1253 Mutex is_stopping_lock
;
1254 Cond is_stopping_cond
;
1259 std::atomic_int state
{NOT_STOPPING
};
1263 void set_state(int s
) {
1266 bool is_stopping() const {
1267 return state
== STOPPING
;
1269 bool is_preparing_to_stop() const {
1270 return state
== PREPARING_TO_STOP
;
1272 bool prepare_to_stop();
1273 void got_stop_ack();
1276 #ifdef PG_DEBUG_REFS
1278 map
<spg_t
, int> pgid_tracker
;
1279 map
<spg_t
, PG
*> live_pgs
;
1280 void add_pgid(spg_t pgid
, PG
*pg
);
1281 void remove_pgid(spg_t pgid
, PG
*pg
);
1282 void dump_live_pgids();
1285 explicit OSDService(OSD
*osd
);
1289 class OSD
: public Dispatcher
,
1290 public md_config_obs_t
{
1292 Mutex osd_lock
; // global lock
1293 SafeTimer tick_timer
; // safe timer (osd_lock)
1295 // Tick timer for those stuff that do not need osd_lock
1296 Mutex tick_timer_lock
;
1297 SafeTimer tick_timer_without_osd_lock
;
1299 // config observer bits
1300 const char** get_tracked_conf_keys() const override
;
1301 void handle_conf_change(const struct md_config_t
*conf
,
1302 const std::set
<std::string
> &changed
) override
;
1303 void update_log_config();
1304 void check_config();
1308 static const double OSD_TICK_INTERVAL
; // tick interval for tick_timer and tick_timer_without_osd_lock
1310 AuthAuthorizeHandlerRegistry
*authorize_handler_cluster_registry
;
1311 AuthAuthorizeHandlerRegistry
*authorize_handler_service_registry
;
1313 Messenger
*cluster_messenger
;
1314 Messenger
*client_messenger
;
1315 Messenger
*objecter_messenger
;
1316 MonClient
*monc
; // check the "monc helpers" list before accessing directly
1318 PerfCounters
*logger
;
1319 PerfCounters
*recoverystate_perf
;
1322 FuseStore
*fuse_store
= nullptr;
1324 LogClient log_client
;
1328 std::string dev_path
, journal_path
;
1330 bool store_is_rotational
= true;
1332 ZTracer::Endpoint trace_endpoint
;
1333 void create_logger();
1334 void create_recoverystate_perf();
1336 void tick_without_osd_lock();
1337 void _dispatch(Message
*m
);
1338 void dispatch_op(OpRequestRef op
);
1340 void check_osdmap_features(ObjectStore
*store
);
1343 friend class OSDSocketHook
;
1344 class OSDSocketHook
*asok_hook
;
1345 bool asok_command(string admin_command
, cmdmap_t
& cmdmap
, string format
, ostream
& ss
);
1348 ClassHandler
*class_handler
= nullptr;
1349 int get_nodeid() { return whoami
; }
1351 static ghobject_t
get_osdmap_pobject_name(epoch_t epoch
) {
1353 snprintf(foo
, sizeof(foo
), "osdmap.%d", epoch
);
1354 return ghobject_t(hobject_t(sobject_t(object_t(foo
), 0)));
1356 static ghobject_t
get_inc_osdmap_pobject_name(epoch_t epoch
) {
1358 snprintf(foo
, sizeof(foo
), "inc_osdmap.%d", epoch
);
1359 return ghobject_t(hobject_t(sobject_t(object_t(foo
), 0)));
1362 static ghobject_t
make_snapmapper_oid() {
1363 return ghobject_t(hobject_t(
1365 object_t("snapmapper"),
1369 static ghobject_t
make_pg_log_oid(spg_t pg
) {
1371 ss
<< "pglog_" << pg
;
1374 return ghobject_t(hobject_t(sobject_t(object_t(s
.c_str()), 0)));
1377 static ghobject_t
make_pg_biginfo_oid(spg_t pg
) {
1379 ss
<< "pginfo_" << pg
;
1382 return ghobject_t(hobject_t(sobject_t(object_t(s
.c_str()), 0)));
1384 static ghobject_t
make_infos_oid() {
1385 hobject_t
oid(sobject_t("infos", CEPH_NOSNAP
));
1386 return ghobject_t(oid
);
1388 static void recursive_remove_collection(CephContext
* cct
,
1394 * get_osd_initial_compat_set()
1396 * Get the initial feature set for this OSD. Features
1397 * here are automatically upgraded.
1399 * Return value: Initial osd CompatSet
1401 static CompatSet
get_osd_initial_compat_set();
1404 * get_osd_compat_set()
1406 * Get all features supported by this OSD
1408 * Return value: CompatSet of all supported features
1410 static CompatSet
get_osd_compat_set();
1415 class C_Tick_WithoutOSDLock
;
1418 OSDSuperblock superblock
;
1420 void write_superblock();
1421 void write_superblock(ObjectStore::Transaction
& t
);
1422 int read_superblock();
1424 void clear_temp_objects();
1426 CompatSet osd_compat
;
1431 STATE_INITIALIZING
= 1,
1436 STATE_WAITING_FOR_HEALTHY
1439 static const char *get_state_name(int s
) {
1441 case STATE_INITIALIZING
: return "initializing";
1442 case STATE_PREBOOT
: return "preboot";
1443 case STATE_BOOTING
: return "booting";
1444 case STATE_ACTIVE
: return "active";
1445 case STATE_STOPPING
: return "stopping";
1446 case STATE_WAITING_FOR_HEALTHY
: return "waiting_for_healthy";
1447 default: return "???";
1452 std::atomic_int state
{STATE_INITIALIZING
};
1455 int get_state() const {
1458 void set_state(int s
) {
1461 bool is_initializing() const {
1462 return state
== STATE_INITIALIZING
;
1464 bool is_preboot() const {
1465 return state
== STATE_PREBOOT
;
1467 bool is_booting() const {
1468 return state
== STATE_BOOTING
;
1470 bool is_active() const {
1471 return state
== STATE_ACTIVE
;
1473 bool is_stopping() const {
1474 return state
== STATE_STOPPING
;
1476 bool is_waiting_for_healthy() const {
1477 return state
== STATE_WAITING_FOR_HEALTHY
;
1482 ThreadPool peering_tp
;
1483 ShardedThreadPool osd_op_tp
;
1485 ThreadPool command_tp
;
1487 void set_disk_tp_priority();
1488 void get_latest_osdmap();
1492 void dispatch_session_waiting(Session
*session
, OSDMapRef osdmap
);
1493 void maybe_share_map(Session
*session
, OpRequestRef op
, OSDMapRef osdmap
);
1495 Mutex session_waiting_lock
;
1496 set
<Session
*> session_waiting_for_map
;
1498 /// Caller assumes refs for included Sessions
1499 void get_sessions_waiting_for_map(set
<Session
*> *out
) {
1500 Mutex::Locker
l(session_waiting_lock
);
1501 out
->swap(session_waiting_for_map
);
1503 void register_session_waiting_on_map(Session
*session
) {
1504 Mutex::Locker
l(session_waiting_lock
);
1505 if (session_waiting_for_map
.insert(session
).second
) {
1509 void clear_session_waiting_on_map(Session
*session
) {
1510 Mutex::Locker
l(session_waiting_lock
);
1511 set
<Session
*>::iterator i
= session_waiting_for_map
.find(session
);
1512 if (i
!= session_waiting_for_map
.end()) {
1514 session_waiting_for_map
.erase(i
);
1517 void dispatch_sessions_waiting_on_map() {
1518 set
<Session
*> sessions_to_check
;
1519 get_sessions_waiting_for_map(&sessions_to_check
);
1520 for (set
<Session
*>::iterator i
= sessions_to_check
.begin();
1521 i
!= sessions_to_check
.end();
1522 sessions_to_check
.erase(i
++)) {
1523 (*i
)->session_dispatch_lock
.Lock();
1524 dispatch_session_waiting(*i
, osdmap
);
1525 (*i
)->session_dispatch_lock
.Unlock();
1529 void session_handle_reset(Session
*session
) {
1530 Mutex::Locker
l(session
->session_dispatch_lock
);
1531 clear_session_waiting_on_map(session
);
1533 session
->clear_backoffs();
1535 /* Messages have connection refs, we need to clear the
1536 * connection->session->message->connection
1537 * cycles which result.
1540 session
->waiting_on_map
.clear_and_dispose(TrackedOp::Putter());
1545 * @defgroup monc helpers
1547 * Right now we only have the one
1551 * Ask the Monitors for a sequence of OSDMaps.
1553 * @param epoch The epoch to start with when replying
1554 * @param force_request True if this request forces a new subscription to
1555 * the monitors; false if an outstanding request that encompasses it is
1558 void osdmap_subscribe(version_t epoch
, bool force_request
);
1559 /** @} monc helpers */
1562 /// information about a heartbeat peer
1563 struct HeartbeatInfo
{
1565 ConnectionRef con_front
; ///< peer connection (front)
1566 ConnectionRef con_back
; ///< peer connection (back)
1567 utime_t first_tx
; ///< time we sent our first ping request
1568 utime_t last_tx
; ///< last time we sent a ping request
1569 utime_t last_rx_front
; ///< last time we got a ping reply on the front side
1570 utime_t last_rx_back
; ///< last time we got a ping reply on the back side
1571 epoch_t epoch
; ///< most recent epoch we wanted this peer
1573 bool is_unhealthy(utime_t cutoff
) const {
1575 ! ((last_rx_front
> cutoff
||
1576 (last_rx_front
== utime_t() && (last_tx
== utime_t() ||
1577 first_tx
> cutoff
))) &&
1578 (last_rx_back
> cutoff
||
1579 (last_rx_back
== utime_t() && (last_tx
== utime_t() ||
1580 first_tx
> cutoff
))));
1582 bool is_healthy(utime_t cutoff
) const {
1583 return last_rx_front
> cutoff
&& last_rx_back
> cutoff
;
1587 /// state attached to outgoing heartbeat connections
1588 struct HeartbeatSession
: public RefCountedObject
{
1590 explicit HeartbeatSession(int p
) : peer(p
) {}
1592 Mutex heartbeat_lock
;
1593 map
<int, int> debug_heartbeat_drops_remaining
;
1594 Cond heartbeat_cond
;
1595 bool heartbeat_stop
;
1596 std::atomic_bool heartbeat_need_update
;
1597 map
<int,HeartbeatInfo
> heartbeat_peers
; ///< map of osd id to HeartbeatInfo
1598 utime_t last_mon_heartbeat
;
1599 Messenger
*hb_front_client_messenger
;
1600 Messenger
*hb_back_client_messenger
;
1601 Messenger
*hb_front_server_messenger
;
1602 Messenger
*hb_back_server_messenger
;
1603 utime_t last_heartbeat_resample
; ///< last time we chose random peers in waiting-for-healthy state
1604 double daily_loadavg
;
1606 void _add_heartbeat_peer(int p
);
1607 void _remove_heartbeat_peer(int p
);
1608 bool heartbeat_reset(Connection
*con
);
1609 void maybe_update_heartbeat_peers();
1610 void reset_heartbeat_peers();
1611 bool heartbeat_peers_need_update() {
1612 return heartbeat_need_update
.load();
1614 void heartbeat_set_peers_need_update() {
1615 heartbeat_need_update
.store(true);
1617 void heartbeat_clear_peers_need_update() {
1618 heartbeat_need_update
.store(false);
1621 void heartbeat_check();
1622 void heartbeat_entry();
1623 void need_heartbeat_peer_update();
1625 void heartbeat_kick() {
1626 Mutex::Locker
l(heartbeat_lock
);
1627 heartbeat_cond
.Signal();
1630 struct T_Heartbeat
: public Thread
{
1632 explicit T_Heartbeat(OSD
*o
) : osd(o
) {}
1633 void *entry() override
{
1634 osd
->heartbeat_entry();
1640 bool heartbeat_dispatch(Message
*m
);
1642 struct HeartbeatDispatcher
: public Dispatcher
{
1644 explicit HeartbeatDispatcher(OSD
*o
) : Dispatcher(o
->cct
), osd(o
) {}
1646 bool ms_can_fast_dispatch_any() const override
{ return true; }
1647 bool ms_can_fast_dispatch(const Message
*m
) const override
{
1648 switch (m
->get_type()) {
1656 void ms_fast_dispatch(Message
*m
) override
{
1657 osd
->heartbeat_dispatch(m
);
1659 bool ms_dispatch(Message
*m
) override
{
1660 return osd
->heartbeat_dispatch(m
);
1662 bool ms_handle_reset(Connection
*con
) override
{
1663 return osd
->heartbeat_reset(con
);
1665 void ms_handle_remote_reset(Connection
*con
) override
{}
1666 bool ms_handle_refused(Connection
*con
) override
{
1667 return osd
->ms_handle_refused(con
);
1669 bool ms_verify_authorizer(Connection
*con
, int peer_type
,
1670 int protocol
, bufferlist
& authorizer_data
, bufferlist
& authorizer_reply
,
1671 bool& isvalid
, CryptoKey
& session_key
) override
{
1675 } heartbeat_dispatcher
;
1679 list
<OpRequestRef
> finished
;
1681 void take_waiters(list
<OpRequestRef
>& ls
) {
1682 assert(osd_lock
.is_locked());
1683 finished
.splice(finished
.end(), ls
);
1687 // -- op tracking --
1688 OpTracker op_tracker
;
1689 void check_ops_in_flight();
1690 void test_ops(std::string command
, std::string args
, ostream
& ss
);
1691 friend class TestOpsSocketHook
;
1692 TestOpsSocketHook
*test_ops_hook
;
1693 friend struct C_CompleteSplits
;
1694 friend struct C_OpenPGs
;
1701 const io_queue op_queue
;
1702 const unsigned int op_prio_cutoff
;
1705 * The ordered op delivery chain is:
1707 * fast dispatch -> pqueue back
1708 * pqueue front <-> to_process back
1709 * to_process front -> RunVis(item)
1712 * The pqueue is per-shard, and to_process is per pg_slot. Items can be
1713 * pushed back up into to_process and/or pqueue while order is preserved.
1715 * Multiple worker threads can operate on each shard.
1717 * Under normal circumstances, num_running == to_proces.size(). There are
1718 * two times when that is not true: (1) when waiting_for_pg == true and
1719 * to_process is accumulating requests that are waiting for the pg to be
1720 * instantiated; in that case they will all get requeued together by
1721 * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
1722 * and already requeued the items.
1724 friend class PGQueueable
;
1726 : public ShardedThreadPool::ShardedWQ
<pair
<spg_t
,PGQueueable
>>
1732 Mutex sdata_op_ordering_lock
; ///< protects all members below
1734 OSDMapRef waiting_for_pg_osdmap
;
1736 PGRef pg
; ///< cached pg reference [optional]
1737 list
<PGQueueable
> to_process
; ///< order items for this slot
1738 int num_running
= 0; ///< _process threads doing pg lookup/lock
1740 /// true if pg does/did not exist. if so all new items go directly to
1741 /// to_process. cleared by prune_pg_waiters.
1742 bool waiting_for_pg
= false;
1744 /// incremented by wake_pg_waiters; indicates racing _process threads
1745 /// should bail out (their op has been requeued)
1746 uint64_t requeue_seq
= 0;
1749 /// map of slots for each spg_t. maintains ordering of items dequeued
1750 /// from pqueue while _process thread drops shard lock to acquire the
1751 /// pg lock. slots are removed only by prune_pg_waiters.
1752 unordered_map
<spg_t
,pg_slot
> pg_slots
;
1755 std::unique_ptr
<OpQueue
< pair
<spg_t
, PGQueueable
>, entity_inst_t
>> pqueue
;
1757 void _enqueue_front(pair
<spg_t
, PGQueueable
> item
, unsigned cutoff
) {
1758 unsigned priority
= item
.second
.get_priority();
1759 unsigned cost
= item
.second
.get_cost();
1760 if (priority
>= cutoff
)
1761 pqueue
->enqueue_strict_front(
1762 item
.second
.get_owner(),
1765 pqueue
->enqueue_front(
1766 item
.second
.get_owner(),
1767 priority
, cost
, item
);
1771 string lock_name
, string ordering_lock
,
1772 uint64_t max_tok_per_prio
, uint64_t min_cost
, CephContext
*cct
,
1774 : sdata_lock(lock_name
.c_str(), false, true, false, cct
),
1775 sdata_op_ordering_lock(ordering_lock
.c_str(), false, true,
1777 if (opqueue
== weightedpriority
) {
1778 pqueue
= std::unique_ptr
1779 <WeightedPriorityQueue
<pair
<spg_t
,PGQueueable
>,entity_inst_t
>>(
1780 new WeightedPriorityQueue
<pair
<spg_t
,PGQueueable
>,entity_inst_t
>(
1781 max_tok_per_prio
, min_cost
));
1782 } else if (opqueue
== prioritized
) {
1783 pqueue
= std::unique_ptr
1784 <PrioritizedQueue
<pair
<spg_t
,PGQueueable
>,entity_inst_t
>>(
1785 new PrioritizedQueue
<pair
<spg_t
,PGQueueable
>,entity_inst_t
>(
1786 max_tok_per_prio
, min_cost
));
1791 vector
<ShardData
*> shard_list
;
1793 uint32_t num_shards
;
1796 ShardedOpWQ(uint32_t pnum_shards
,
1800 ShardedThreadPool
* tp
)
1801 : ShardedThreadPool::ShardedWQ
<pair
<spg_t
,PGQueueable
>>(ti
, si
, tp
),
1803 num_shards(pnum_shards
) {
1804 for (uint32_t i
= 0; i
< num_shards
; i
++) {
1805 char lock_name
[32] = {0};
1806 snprintf(lock_name
, sizeof(lock_name
), "%s.%d", "OSD:ShardedOpWQ:", i
);
1807 char order_lock
[32] = {0};
1808 snprintf(order_lock
, sizeof(order_lock
), "%s.%d",
1809 "OSD:ShardedOpWQ:order:", i
);
1810 ShardData
* one_shard
= new ShardData(
1811 lock_name
, order_lock
,
1812 osd
->cct
->_conf
->osd_op_pq_max_tokens_per_priority
,
1813 osd
->cct
->_conf
->osd_op_pq_min_cost
, osd
->cct
, osd
->op_queue
);
1814 shard_list
.push_back(one_shard
);
1817 ~ShardedOpWQ() override
{
1818 while (!shard_list
.empty()) {
1819 delete shard_list
.back();
1820 shard_list
.pop_back();
1824 /// wake any pg waiters after a PG is created/instantiated
1825 void wake_pg_waiters(spg_t pgid
);
1827 /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here
1828 void prune_pg_waiters(OSDMapRef osdmap
, int whoami
);
1830 /// clear cached PGRef on pg deletion
1831 void clear_pg_pointer(spg_t pgid
);
1833 /// clear pg_slots on shutdown
1834 void clear_pg_slots();
1836 /// try to do some work
1837 void _process(uint32_t thread_index
, heartbeat_handle_d
*hb
) override
;
1839 /// enqueue a new item
1840 void _enqueue(pair
<spg_t
, PGQueueable
> item
) override
;
1842 /// requeue an old item (at the front of the line)
1843 void _enqueue_front(pair
<spg_t
, PGQueueable
> item
) override
;
1845 void return_waiting_threads() override
{
1846 for(uint32_t i
= 0; i
< num_shards
; i
++) {
1847 ShardData
* sdata
= shard_list
[i
];
1848 assert (NULL
!= sdata
);
1849 sdata
->sdata_lock
.Lock();
1850 sdata
->sdata_cond
.Signal();
1851 sdata
->sdata_lock
.Unlock();
1855 void dump(Formatter
*f
) {
1856 for(uint32_t i
= 0; i
< num_shards
; i
++) {
1857 ShardData
* sdata
= shard_list
[i
];
1858 char lock_name
[32] = {0};
1859 snprintf(lock_name
, sizeof(lock_name
), "%s%d", "OSD:ShardedOpWQ:", i
);
1860 assert (NULL
!= sdata
);
1861 sdata
->sdata_op_ordering_lock
.Lock();
1862 f
->open_object_section(lock_name
);
1863 sdata
->pqueue
->dump(f
);
1865 sdata
->sdata_op_ordering_lock
.Unlock();
1869 /// Must be called on ops queued back to front
1872 list
<OpRequestRef
> *out_ops
;
1873 uint64_t reserved_pushes_to_free
;
1874 Pred(spg_t pg
, list
<OpRequestRef
> *out_ops
= 0)
1875 : pgid(pg
), out_ops(out_ops
), reserved_pushes_to_free(0) {}
1876 void accumulate(const PGQueueable
&op
) {
1877 reserved_pushes_to_free
+= op
.get_reserved_pushes();
1879 boost::optional
<OpRequestRef
> mop
= op
.maybe_get_op();
1881 out_ops
->push_front(*mop
);
1884 bool operator()(const pair
<spg_t
, PGQueueable
> &op
) {
1885 if (op
.first
== pgid
) {
1886 accumulate(op
.second
);
1892 uint64_t get_reserved_pushes_to_free() const {
1893 return reserved_pushes_to_free
;
1897 bool is_shard_empty(uint32_t thread_index
) override
{
1898 uint32_t shard_index
= thread_index
% num_shards
;
1899 ShardData
* sdata
= shard_list
[shard_index
];
1900 assert(NULL
!= sdata
);
1901 Mutex::Locker
l(sdata
->sdata_op_ordering_lock
);
1902 return sdata
->pqueue
->empty();
1907 void enqueue_op(spg_t pg
, OpRequestRef
& op
, epoch_t epoch
);
1909 PGRef pg
, OpRequestRef op
,
1910 ThreadPool::TPHandle
&handle
);
1912 // -- peering queue --
1913 struct PeeringWQ
: public ThreadPool::BatchWorkQueue
<PG
> {
1914 list
<PG
*> peering_queue
;
1917 PeeringWQ(OSD
*o
, time_t ti
, time_t si
, ThreadPool
*tp
)
1918 : ThreadPool::BatchWorkQueue
<PG
>(
1919 "OSD::PeeringWQ", ti
, si
, tp
), osd(o
) {}
1921 void _dequeue(PG
*pg
) override
{
1922 for (list
<PG
*>::iterator i
= peering_queue
.begin();
1923 i
!= peering_queue
.end();
1926 peering_queue
.erase(i
++);
1927 pg
->put("PeeringWQ");
1933 bool _enqueue(PG
*pg
) override
{
1934 pg
->get("PeeringWQ");
1935 peering_queue
.push_back(pg
);
1938 bool _empty() override
{
1939 return peering_queue
.empty();
1941 void _dequeue(list
<PG
*> *out
) override
;
1943 const list
<PG
*> &pgs
,
1944 ThreadPool::TPHandle
&handle
) override
{
1945 assert(!pgs
.empty());
1946 osd
->process_peering_events(pgs
, handle
);
1947 for (list
<PG
*>::const_iterator i
= pgs
.begin();
1950 (*i
)->put("PeeringWQ");
1953 void _process_finish(const list
<PG
*> &pgs
) override
{
1954 for (list
<PG
*>::const_iterator i
= pgs
.begin();
1960 void _clear() override
{
1961 assert(peering_queue
.empty());
1965 void process_peering_events(
1966 const list
<PG
*> &pg
,
1967 ThreadPool::TPHandle
&handle
);
1970 friend class PrimaryLogPG
;
1977 OSDMapRef
get_osdmap() {
1980 epoch_t
get_osdmap_epoch() {
1981 return osdmap
? osdmap
->get_epoch() : 0;
1984 utime_t had_map_since
;
1986 list
<OpRequestRef
> waiting_for_osdmap
;
1987 deque
<utime_t
> osd_markdown_log
;
1989 friend struct send_map_on_destruct
;
1991 void wait_for_new_map(OpRequestRef op
);
1992 void handle_osd_map(class MOSDMap
*m
);
1993 void _committed_osd_maps(epoch_t first
, epoch_t last
, class MOSDMap
*m
);
1994 void trim_maps(epoch_t oldest
, int nreceived
, bool skip_maps
);
1995 void note_down_osd(int osd
);
1996 void note_up_osd(int osd
);
1997 friend class C_OnMapCommit
;
2000 epoch_t advance_to
, PG
*pg
,
2001 ThreadPool::TPHandle
&handle
,
2002 PG::RecoveryCtx
*rctx
,
2003 set
<PGRef
> *split_pgs
2006 void activate_map();
2008 // osd map cache (past osd maps)
2009 OSDMapRef
get_map(epoch_t e
) {
2010 return service
.get_map(e
);
2012 OSDMapRef
add_map(OSDMap
*o
) {
2013 return service
.add_map(o
);
2015 void add_map_bl(epoch_t e
, bufferlist
& bl
) {
2016 return service
.add_map_bl(e
, bl
);
2018 void pin_map_bl(epoch_t e
, bufferlist
&bl
) {
2019 return service
.pin_map_bl(e
, bl
);
2021 bool get_map_bl(epoch_t e
, bufferlist
& bl
) {
2022 return service
.get_map_bl(e
, bl
);
2024 void add_map_inc_bl(epoch_t e
, bufferlist
& bl
) {
2025 return service
.add_map_inc_bl(e
, bl
);
2027 void pin_map_inc_bl(epoch_t e
, bufferlist
&bl
) {
2028 return service
.pin_map_inc_bl(e
, bl
);
2032 // -- placement groups --
2033 RWLock pg_map_lock
; // this lock orders *above* individual PG _locks
2034 ceph::unordered_map
<spg_t
, PG
*> pg_map
; // protected by pg_map lock
2036 map
<spg_t
, list
<PG::CephPeeringEvtRef
> > peering_wait_for_split
;
2037 PGRecoveryStats pg_recovery_stats
;
2039 PGPool
_get_pool(int id
, OSDMapRef createmap
);
2041 PG
*_lookup_lock_pg_with_map_lock_held(spg_t pgid
);
2042 PG
*_lookup_lock_pg(spg_t pgid
);
2045 PG
*lookup_lock_pg(spg_t pgid
);
2048 PG
*_open_lock_pg(OSDMapRef createmap
,
2049 spg_t pg
, bool no_lockdep_check
=false);
2051 RES_PARENT
, // resurrected a parent
2052 RES_SELF
, // resurrected self
2053 RES_NONE
// nothing relevant deleting
2055 res_result
_try_resurrect_pg(
2056 OSDMapRef curmap
, spg_t pgid
, spg_t
*resurrected
, PGRef
*old_pg_state
);
2058 PG
*_create_lock_pg(
2059 OSDMapRef createmap
,
2064 vector
<int>& up
, int up_primary
,
2065 vector
<int>& acting
, int acting_primary
,
2066 pg_history_t history
,
2067 const PastIntervals
& pi
,
2068 ObjectStore::Transaction
& t
);
2070 PG
* _make_pg(OSDMapRef createmap
, spg_t pgid
);
2071 void add_newly_split_pg(PG
*pg
,
2072 PG::RecoveryCtx
*rctx
);
2074 int handle_pg_peering_evt(
2076 const pg_history_t
& orig_history
,
2077 const PastIntervals
& pi
,
2079 PG::CephPeeringEvtRef evt
);
2082 void build_past_intervals_parallel();
2084 /// build initial pg history and intervals on create
2085 void build_initial_pg_history(
2088 utime_t created_stamp
,
2092 /// project pg history from from to now
2093 bool project_pg_history(
2094 spg_t pgid
, pg_history_t
& h
, epoch_t from
,
2095 const vector
<int>& lastup
,
2097 const vector
<int>& lastacting
,
2098 int lastactingprimary
2099 ); ///< @return false if there was a map gap between from and now
2101 // this must be called with pg->lock held on any pg addition to pg_map
2102 void wake_pg_waiters(PGRef pg
) {
2103 assert(pg
->is_locked());
2104 op_shardedwq
.wake_pg_waiters(pg
->info
.pgid
);
2106 epoch_t last_pg_create_epoch
;
2108 void handle_pg_create(OpRequestRef op
);
2112 const set
<spg_t
> &childpgids
, set
<PGRef
> *out_pgs
,
2115 PG::RecoveryCtx
*rctx
);
2117 // == monitor interaction ==
2118 Mutex mon_report_lock
;
2119 utime_t last_mon_report
;
2120 utime_t last_pg_stats_sent
;
2122 /* if our monitor dies, we want to notice it and reconnect.
2123 * So we keep track of when it last acked our stat updates,
2124 * and if too much time passes (and we've been sending
2125 * more updates) then we can call it dead and reconnect
2128 utime_t last_pg_stats_ack
;
2129 float stats_ack_timeout
;
2130 set
<uint64_t> outstanding_pg_stats
; // how many stat updates haven't been acked yet
2134 void _got_mon_epochs(epoch_t oldest
, epoch_t newest
);
2135 void _preboot(epoch_t oldest
, epoch_t newest
);
2137 void _collect_metadata(map
<string
,string
> *pmeta
);
2139 void start_waiting_for_healthy();
2142 void send_full_update();
2144 friend struct C_OSD_GetVersion
;
2147 epoch_t up_thru_wanted
;
2149 void queue_want_up_thru(epoch_t want
);
2152 // -- full map requests --
2153 epoch_t requested_full_first
, requested_full_last
;
2155 void request_full_map(epoch_t first
, epoch_t last
);
2156 void rerequest_full_maps() {
2157 epoch_t first
= requested_full_first
;
2158 epoch_t last
= requested_full_last
;
2159 requested_full_first
= 0;
2160 requested_full_last
= 0;
2161 request_full_map(first
, last
);
2163 void got_full_map(epoch_t e
);
2166 map
<int,utime_t
> failure_queue
;
2167 map
<int,pair
<utime_t
,entity_inst_t
> > failure_pending
;
2169 void requeue_failures();
2170 void send_failures();
2171 void send_still_alive(epoch_t epoch
, const entity_inst_t
&i
);
2174 Mutex pg_stat_queue_lock
;
2175 Cond pg_stat_queue_cond
;
2176 xlist
<PG
*> pg_stat_queue
;
2177 bool osd_stat_updated
;
2178 uint64_t pg_stat_tid
, pg_stat_tid_flushed
;
2180 void send_pg_stats(const utime_t
&now
);
2181 void handle_pg_stats_ack(class MPGStatsAck
*ack
);
2182 void flush_pg_stats();
2184 ceph::coarse_mono_clock::time_point last_sent_beacon
;
2185 Mutex min_last_epoch_clean_lock
{"OSD::min_last_epoch_clean_lock"};
2186 epoch_t min_last_epoch_clean
= 0;
2187 // which pgs were scanned for min_lec
2188 std::vector
<pg_t
> min_last_epoch_clean_pgs
;
2189 void send_beacon(const ceph::coarse_mono_clock::time_point
& now
);
2191 void pg_stat_queue_enqueue(PG
*pg
) {
2192 pg_stat_queue_lock
.Lock();
2193 if (pg
->is_primary() && !pg
->stat_queue_item
.is_on_list()) {
2194 pg
->get("pg_stat_queue");
2195 pg_stat_queue
.push_back(&pg
->stat_queue_item
);
2197 osd_stat_updated
= true;
2198 pg_stat_queue_lock
.Unlock();
2200 void pg_stat_queue_dequeue(PG
*pg
) {
2201 pg_stat_queue_lock
.Lock();
2202 if (pg
->stat_queue_item
.remove_myself())
2203 pg
->put("pg_stat_queue");
2204 pg_stat_queue_lock
.Unlock();
2206 void clear_pg_stat_queue() {
2207 pg_stat_queue_lock
.Lock();
2208 while (!pg_stat_queue
.empty()) {
2209 PG
*pg
= pg_stat_queue
.front();
2210 pg_stat_queue
.pop_front();
2211 pg
->put("pg_stat_queue");
2213 pg_stat_queue_lock
.Unlock();
2216 ceph_tid_t
get_tid() {
2217 return service
.get_tid();
2220 // -- generic pg peering --
2221 PG::RecoveryCtx
create_context();
2222 void dispatch_context(PG::RecoveryCtx
&ctx
, PG
*pg
, OSDMapRef curmap
,
2223 ThreadPool::TPHandle
*handle
= NULL
);
2224 void dispatch_context_transaction(PG::RecoveryCtx
&ctx
, PG
*pg
,
2225 ThreadPool::TPHandle
*handle
= NULL
);
2226 void do_notifies(map
<int,
2227 vector
<pair
<pg_notify_t
, PastIntervals
> > >&
2230 void do_queries(map
<int, map
<spg_t
,pg_query_t
> >& query_map
,
2232 void do_infos(map
<int,
2233 vector
<pair
<pg_notify_t
, PastIntervals
> > >& info_map
,
2236 bool require_mon_peer(const Message
*m
);
2237 bool require_mon_or_mgr_peer(const Message
*m
);
2238 bool require_osd_peer(const Message
*m
);
2240 * Verifies that we were alive in the given epoch, and that
2243 bool require_self_aliveness(const Message
*m
, epoch_t alive_since
);
2245 * Verifies that the OSD who sent the given op has the same
2246 * address as in the given map.
2247 * @pre op was sent by an OSD using the cluster messenger
2249 bool require_same_peer_instance(const Message
*m
, OSDMapRef
& map
,
2250 bool is_fast_dispatch
);
2252 bool require_same_or_newer_map(OpRequestRef
& op
, epoch_t e
,
2253 bool is_fast_dispatch
);
2255 void handle_pg_query(OpRequestRef op
);
2256 void handle_pg_notify(OpRequestRef op
);
2257 void handle_pg_log(OpRequestRef op
);
2258 void handle_pg_info(OpRequestRef op
);
2259 void handle_pg_trim(OpRequestRef op
);
2261 void handle_pg_backfill_reserve(OpRequestRef op
);
2262 void handle_pg_recovery_reserve(OpRequestRef op
);
2264 void handle_pg_remove(OpRequestRef op
);
2265 void _remove_pg(PG
*pg
);
2274 Command(vector
<string
>& c
, ceph_tid_t t
, bufferlist
& bl
, Connection
*co
)
2275 : cmd(c
), tid(t
), indata(bl
), con(co
) {}
2277 list
<Command
*> command_queue
;
2278 struct CommandWQ
: public ThreadPool::WorkQueue
<Command
> {
2280 CommandWQ(OSD
*o
, time_t ti
, time_t si
, ThreadPool
*tp
)
2281 : ThreadPool::WorkQueue
<Command
>("OSD::CommandWQ", ti
, si
, tp
), osd(o
) {}
2283 bool _empty() override
{
2284 return osd
->command_queue
.empty();
2286 bool _enqueue(Command
*c
) override
{
2287 osd
->command_queue
.push_back(c
);
2290 void _dequeue(Command
*pg
) override
{
2293 Command
*_dequeue() override
{
2294 if (osd
->command_queue
.empty())
2296 Command
*c
= osd
->command_queue
.front();
2297 osd
->command_queue
.pop_front();
2300 void _process(Command
*c
, ThreadPool::TPHandle
&) override
{
2301 osd
->osd_lock
.Lock();
2302 if (osd
->is_stopping()) {
2303 osd
->osd_lock
.Unlock();
2307 osd
->do_command(c
->con
.get(), c
->tid
, c
->cmd
, c
->indata
);
2308 osd
->osd_lock
.Unlock();
2311 void _clear() override
{
2312 while (!osd
->command_queue
.empty()) {
2313 Command
*c
= osd
->command_queue
.front();
2314 osd
->command_queue
.pop_front();
2320 void handle_command(class MMonCommand
*m
);
2321 void handle_command(class MCommand
*m
);
2322 void do_command(Connection
*con
, ceph_tid_t tid
, vector
<string
>& cmd
, bufferlist
& data
);
2324 // -- pg recovery --
2325 void do_recovery(PG
*pg
, epoch_t epoch_queued
, uint64_t pushes_reserved
,
2326 ThreadPool::TPHandle
&handle
);
2331 bool scrub_random_backoff();
2332 bool scrub_load_below_threshold();
2333 bool scrub_time_permit(utime_t now
);
2337 public ThreadPool::WorkQueueVal
<pair
<PGRef
, DeletingStateRef
> > {
2339 ObjectStore
*&store
;
2340 list
<pair
<PGRef
, DeletingStateRef
> > remove_queue
;
2341 RemoveWQ(CephContext
* cct
, ObjectStore
*&o
, time_t ti
, time_t si
,
2343 : ThreadPool::WorkQueueVal
<pair
<PGRef
, DeletingStateRef
> >(
2344 "OSD::RemoveWQ", ti
, si
, tp
), cct(cct
), store(o
) {}
2346 bool _empty() override
{
2347 return remove_queue
.empty();
2349 void _enqueue(pair
<PGRef
, DeletingStateRef
> item
) override
{
2350 remove_queue
.push_back(item
);
2352 void _enqueue_front(pair
<PGRef
, DeletingStateRef
> item
) override
{
2353 remove_queue
.push_front(item
);
2355 bool _dequeue(pair
<PGRef
, DeletingStateRef
> item
) {
2358 pair
<PGRef
, DeletingStateRef
> _dequeue() override
{
2359 assert(!remove_queue
.empty());
2360 pair
<PGRef
, DeletingStateRef
> item
= remove_queue
.front();
2361 remove_queue
.pop_front();
2364 void _process(pair
<PGRef
, DeletingStateRef
>,
2365 ThreadPool::TPHandle
&) override
;
2366 void _clear() override
{
2367 remove_queue
.clear();
2372 bool ms_can_fast_dispatch_any() const override
{ return true; }
2373 bool ms_can_fast_dispatch(const Message
*m
) const override
{
2374 switch (m
->get_type()) {
2375 case CEPH_MSG_OSD_OP
:
2376 case CEPH_MSG_OSD_BACKOFF
:
2379 case MSG_OSD_SUBOPREPLY
:
2380 case MSG_OSD_REPOPREPLY
:
2381 case MSG_OSD_PG_PUSH
:
2382 case MSG_OSD_PG_PULL
:
2383 case MSG_OSD_PG_PUSH_REPLY
:
2384 case MSG_OSD_PG_SCAN
:
2385 case MSG_OSD_PG_BACKFILL
:
2386 case MSG_OSD_PG_BACKFILL_REMOVE
:
2387 case MSG_OSD_EC_WRITE
:
2388 case MSG_OSD_EC_WRITE_REPLY
:
2389 case MSG_OSD_EC_READ
:
2390 case MSG_OSD_EC_READ_REPLY
:
2391 case MSG_OSD_SCRUB_RESERVE
:
2392 case MSG_OSD_REP_SCRUB
:
2393 case MSG_OSD_REP_SCRUBMAP
:
2394 case MSG_OSD_PG_UPDATE_LOG_MISSING
:
2395 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY
:
2401 void ms_fast_dispatch(Message
*m
) override
;
2402 void ms_fast_preprocess(Message
*m
) override
;
2403 bool ms_dispatch(Message
*m
) override
;
2404 bool ms_get_authorizer(int dest_type
, AuthAuthorizer
**authorizer
, bool force_new
) override
;
2405 bool ms_verify_authorizer(Connection
*con
, int peer_type
,
2406 int protocol
, bufferlist
& authorizer
, bufferlist
& authorizer_reply
,
2407 bool& isvalid
, CryptoKey
& session_key
) override
;
2408 void ms_handle_connect(Connection
*con
) override
;
2409 void ms_handle_fast_connect(Connection
*con
) override
;
2410 void ms_handle_fast_accept(Connection
*con
) override
;
2411 bool ms_handle_reset(Connection
*con
) override
;
2412 void ms_handle_remote_reset(Connection
*con
) override
{}
2413 bool ms_handle_refused(Connection
*con
) override
;
2415 io_queue
get_io_queue() const {
2416 if (cct
->_conf
->osd_op_queue
== "debug_random") {
2418 return (rand() % 2 < 1) ? prioritized
: weightedpriority
;
2419 } else if (cct
->_conf
->osd_op_queue
== "wpq") {
2420 return weightedpriority
;
2426 unsigned int get_io_prio_cut() const {
2427 if (cct
->_conf
->osd_op_queue_cut_off
== "debug_random") {
2429 return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH
: CEPH_MSG_PRIO_LOW
;
2430 } else if (cct
->_conf
->osd_op_queue_cut_off
== "low") {
2431 return CEPH_MSG_PRIO_LOW
;
2433 return CEPH_MSG_PRIO_HIGH
;
2438 /* internal and external can point to the same messenger, they will still
2439 * be cleaned up properly*/
2440 OSD(CephContext
*cct_
,
2441 ObjectStore
*store_
,
2443 Messenger
*internal
,
2444 Messenger
*external
,
2445 Messenger
*hb_front_client
,
2446 Messenger
*hb_back_client
,
2447 Messenger
*hb_front_server
,
2448 Messenger
*hb_back_server
,
2449 Messenger
*osdc_messenger
,
2450 MonClient
*mc
, const std::string
&dev
, const std::string
&jdev
);
2454 static int mkfs(CephContext
*cct
, ObjectStore
*store
,
2456 uuid_d fsid
, int whoami
);
2457 /* remove any non-user xattrs from a map of them */
2458 void filter_xattrs(map
<string
, bufferptr
>& attrs
) {
2459 for (map
<string
, bufferptr
>::iterator iter
= attrs
.begin();
2460 iter
!= attrs
.end();
2462 if (('_' != iter
->first
.at(0)) || (iter
->first
.size() == 1))
2463 attrs
.erase(iter
++);
2469 int mon_cmd_maybe_osd_create(string
&cmd
);
2470 int update_crush_device_class();
2471 int update_crush_location();
2473 static int write_meta(ObjectStore
*store
,
2474 uuid_d
& cluster_fsid
, uuid_d
& osd_fsid
, int whoami
);
2476 void handle_pg_scrub(struct MOSDScrub
*m
, PG
* pg
);
2477 void handle_scrub(struct MOSDScrub
*m
);
2478 void handle_osd_ping(class MOSDPing
*m
);
2480 int init_op_flags(OpRequestRef
& op
);
2482 int get_num_op_shards();
2483 int get_num_op_threads();
2486 static int peek_meta(ObjectStore
*store
, string
& magic
,
2487 uuid_d
& cluster_fsid
, uuid_d
& osd_fsid
, int& whoami
);
2495 int enable_disable_fuse(bool stop
);
2497 void suicide(int exitcode
);
2500 void handle_signal(int signum
);
2502 /// check if we can throw out op from a disconnected client
2503 static bool op_is_discardable(const MOSDOp
*m
);
2507 friend class OSDService
;
2510 //compatibility of the executable
2511 extern const CompatSet::Feature ceph_osd_feature_compat
[];
2512 extern const CompatSet::Feature ceph_osd_feature_ro_compat
[];
2513 extern const CompatSet::Feature ceph_osd_feature_incompat
[];