1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
20 #include "msg/Dispatcher.h"
22 #include "common/async/context_pool.h"
23 #include "common/Timer.h"
24 #include "common/WorkQueue.h"
25 #include "common/AsyncReserver.h"
26 #include "common/ceph_context.h"
27 #include "common/config_cacher.h"
28 #include "common/zipkin_trace.h"
29 #include "common/ceph_timer.h"
31 #include "mgr/MgrClient.h"
33 #include "os/ObjectStore.h"
35 #include "include/CompatSet.h"
36 #include "include/common_fwd.h"
38 #include "OpRequest.h"
41 #include "osd/scheduler/OpScheduler.h"
48 #include "include/unordered_map.h"
50 #include "common/shared_cache.hpp"
51 #include "common/simple_cache.hpp"
52 #include "messages/MOSDOp.h"
53 #include "common/EventTrace.h"
54 #include "osd/osd_perf_counters.h"
55 #include "common/Finisher.h"
56 #include "scrubber/osd_scrub_sched.h"
58 #define CEPH_OSD_PROTOCOL 10 /* cluster internal */
62 lock ordering for pg map
83 class TestOpsSocketHook
;
84 struct C_FinishSplits
;
92 class MOSDForceRecovery
;
93 class MMonGetPurgedSnapsReply
;
98 using OpSchedulerItem
= ceph::osd::scheduler::OpSchedulerItem
;
102 ObjectStore::CollectionHandle meta_ch
;
104 ObjectStore
* const store
;
105 LogClient
&log_client
;
107 PGRecoveryStats
&pg_recovery_stats
;
109 Messenger
*&cluster_messenger
;
110 Messenger
*&client_messenger
;
112 PerfCounters
*&logger
;
113 PerfCounters
*&recoverystate_perf
;
116 md_config_cacher_t
<Option::size_t> osd_max_object_size
;
117 md_config_cacher_t
<bool> osd_skip_data_digest
;
119 void enqueue_back(OpSchedulerItem
&& qi
);
120 void enqueue_front(OpSchedulerItem
&& qi
);
122 void maybe_inject_dispatch_delay() {
123 if (g_conf()->osd_debug_inject_dispatch_delay_probability
> 0) {
125 g_conf()->osd_debug_inject_dispatch_delay_probability
* 10000) {
127 t
.set_from_double(g_conf()->osd_debug_inject_dispatch_delay_duration
);
133 ceph::signedspan
get_mnow();
137 ceph::mutex publish_lock
, pre_publish_lock
; // pre-publish orders before publish
138 OSDSuperblock superblock
;
141 OSDSuperblock
get_superblock() {
142 std::lock_guard
l(publish_lock
);
145 void publish_superblock(const OSDSuperblock
&block
) {
146 std::lock_guard
l(publish_lock
);
150 int get_nodeid() const { return whoami
; }
152 std::atomic
<epoch_t
> max_oldest_map
;
157 OSDMapRef
get_osdmap() {
158 std::lock_guard
l(publish_lock
);
161 epoch_t
get_osdmap_epoch() {
162 std::lock_guard
l(publish_lock
);
163 return osdmap
? osdmap
->get_epoch() : 0;
165 void publish_map(OSDMapRef map
) {
166 std::lock_guard
l(publish_lock
);
171 * osdmap - current published std::map
172 * next_osdmap - pre_published std::map that is about to be published.
174 * We use the next_osdmap to send messages and initiate connections,
175 * but only if the target is the same instance as the one in the std::map
176 * epoch the current user is working from (i.e., the result is
177 * equivalent to what is in next_osdmap).
179 * This allows the helpers to start ignoring osds that are about to
180 * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
181 * down, without worrying about reopening connections from threads
182 * working from old maps.
185 OSDMapRef next_osdmap
;
186 ceph::condition_variable pre_publish_cond
;
187 int pre_publish_waiter
= 0;
190 void pre_publish_map(OSDMapRef map
) {
191 std::lock_guard
l(pre_publish_lock
);
192 next_osdmap
= std::move(map
);
196 /// map epochs reserved below
197 std::map
<epoch_t
, unsigned> map_reservations
;
199 /// gets ref to next_osdmap and registers the epoch as reserved
200 OSDMapRef
get_nextmap_reserved() {
201 std::lock_guard
l(pre_publish_lock
);
202 epoch_t e
= next_osdmap
->get_epoch();
203 std::map
<epoch_t
, unsigned>::iterator i
=
204 map_reservations
.insert(std::make_pair(e
, 0)).first
;
208 /// releases reservation on map
209 void release_map(OSDMapRef osdmap
) {
210 std::lock_guard
l(pre_publish_lock
);
211 std::map
<epoch_t
, unsigned>::iterator i
=
212 map_reservations
.find(osdmap
->get_epoch());
213 ceph_assert(i
!= map_reservations
.end());
214 ceph_assert(i
->second
> 0);
215 if (--(i
->second
) == 0) {
216 map_reservations
.erase(i
);
218 if (pre_publish_waiter
) {
219 pre_publish_cond
.notify_all();
222 /// blocks until there are no reserved maps prior to next_osdmap
223 void await_reserved_maps() {
224 std::unique_lock l
{pre_publish_lock
};
225 ceph_assert(next_osdmap
);
226 pre_publish_waiter
++;
227 pre_publish_cond
.wait(l
, [this] {
228 auto i
= map_reservations
.cbegin();
229 return (i
== map_reservations
.cend() ||
230 i
->first
>= next_osdmap
->get_epoch());
232 pre_publish_waiter
--;
234 OSDMapRef
get_next_osdmap() {
235 std::lock_guard
l(pre_publish_lock
);
239 void maybe_share_map(Connection
*con
,
240 const OSDMapRef
& osdmap
,
241 epoch_t peer_epoch_lb
=0);
243 void send_map(class MOSDMap
*m
, Connection
*con
);
244 void send_incremental_map(epoch_t since
, Connection
*con
,
245 const OSDMapRef
& osdmap
);
246 MOSDMap
*build_incremental_map_msg(epoch_t from
, epoch_t to
,
247 OSDSuperblock
& superblock
);
249 ConnectionRef
get_con_osd_cluster(int peer
, epoch_t from_epoch
);
250 std::pair
<ConnectionRef
,ConnectionRef
> get_con_osd_hb(int peer
, epoch_t from_epoch
); // (back, front)
251 void send_message_osd_cluster(int peer
, Message
*m
, epoch_t from_epoch
);
252 void send_message_osd_cluster(std::vector
<std::pair
<int, Message
*>>& messages
, epoch_t from_epoch
);
253 void send_message_osd_cluster(MessageRef m
, Connection
*con
) {
254 con
->send_message2(std::move(m
));
256 void send_message_osd_cluster(Message
*m
, const ConnectionRef
& con
) {
257 con
->send_message(m
);
259 void send_message_osd_client(Message
*m
, const ConnectionRef
& con
) {
260 con
->send_message(m
);
262 entity_name_t
get_cluster_msgr_name() const;
267 void reply_op_error(OpRequestRef op
, int err
);
268 void reply_op_error(OpRequestRef op
, int err
, eversion_t v
, version_t uv
,
269 std::vector
<pg_log_op_return_item_t
> op_returns
);
270 void handle_misdirected_op(PG
*pg
, OpRequestRef op
);
274 * The entity that maintains the set of PGs we may scrub (i.e. - those that we
275 * are their primary), and schedules their scrubbing.
277 ScrubQueue m_scrub_queue
;
280 ScrubQueue
& get_scrub_services() { return m_scrub_queue
; }
283 * A callback used by the ScrubQueue object to initiate a scrub on a specific PG.
285 * The request might fail for multiple reasons, as ScrubQueue cannot by its own
286 * check some of the PG-specific preconditions and those are checked here. See
287 * attempt_t definition.
289 * @param pgid to scrub
290 * @param allow_requested_repair_only
291 * @return a Scrub::attempt_t detailing either a success, or the failure reason.
293 Scrub::schedule_result_t
initiate_a_scrub(spg_t pgid
, bool allow_requested_repair_only
);
297 // -- agent shared state --
298 ceph::mutex agent_lock
= ceph::make_mutex("OSDService::agent_lock");
299 ceph::condition_variable agent_cond
;
300 std::map
<uint64_t, std::set
<PGRef
> > agent_queue
;
301 std::set
<PGRef
>::iterator agent_queue_pos
;
302 bool agent_valid_iterator
;
304 int flush_mode_high_count
; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
305 std::set
<hobject_t
> agent_oids
;
307 struct AgentThread
: public Thread
{
309 explicit AgentThread(OSDService
*o
) : osd(o
) {}
310 void *entry() override
{
315 bool agent_stop_flag
;
316 ceph::mutex agent_timer_lock
= ceph::make_mutex("OSDService::agent_timer_lock");
317 SafeTimer agent_timer
;
323 void _enqueue(PG
*pg
, uint64_t priority
) {
324 if (!agent_queue
.empty() &&
325 agent_queue
.rbegin()->first
< priority
)
326 agent_valid_iterator
= false; // inserting higher-priority queue
327 std::set
<PGRef
>& nq
= agent_queue
[priority
];
329 agent_cond
.notify_all();
333 void _dequeue(PG
*pg
, uint64_t old_priority
) {
334 std::set
<PGRef
>& oq
= agent_queue
[old_priority
];
335 std::set
<PGRef
>::iterator p
= oq
.find(pg
);
336 ceph_assert(p
!= oq
.end());
337 if (p
== agent_queue_pos
)
341 if (agent_queue
.rbegin()->first
== old_priority
)
342 agent_valid_iterator
= false;
343 agent_queue
.erase(old_priority
);
347 /// enable agent for a pg
348 void agent_enable_pg(PG
*pg
, uint64_t priority
) {
349 std::lock_guard
l(agent_lock
);
350 _enqueue(pg
, priority
);
353 /// adjust priority for an enagled pg
354 void agent_adjust_pg(PG
*pg
, uint64_t old_priority
, uint64_t new_priority
) {
355 std::lock_guard
l(agent_lock
);
356 ceph_assert(new_priority
!= old_priority
);
357 _enqueue(pg
, new_priority
);
358 _dequeue(pg
, old_priority
);
361 /// disable agent for a pg
362 void agent_disable_pg(PG
*pg
, uint64_t old_priority
) {
363 std::lock_guard
l(agent_lock
);
364 _dequeue(pg
, old_priority
);
367 /// note start of an async (evict) op
368 void agent_start_evict_op() {
369 std::lock_guard
l(agent_lock
);
373 /// note finish or cancellation of an async (evict) op
374 void agent_finish_evict_op() {
375 std::lock_guard
l(agent_lock
);
376 ceph_assert(agent_ops
> 0);
378 agent_cond
.notify_all();
381 /// note start of an async (flush) op
382 void agent_start_op(const hobject_t
& oid
) {
383 std::lock_guard
l(agent_lock
);
385 ceph_assert(agent_oids
.count(oid
) == 0);
386 agent_oids
.insert(oid
);
389 /// note finish or cancellation of an async (flush) op
390 void agent_finish_op(const hobject_t
& oid
) {
391 std::lock_guard
l(agent_lock
);
392 ceph_assert(agent_ops
> 0);
394 ceph_assert(agent_oids
.count(oid
) == 1);
395 agent_oids
.erase(oid
);
396 agent_cond
.notify_all();
399 /// check if we are operating on an object
400 bool agent_is_active_oid(const hobject_t
& oid
) {
401 std::lock_guard
l(agent_lock
);
402 return agent_oids
.count(oid
);
405 /// get count of active agent ops
406 int agent_get_num_ops() {
407 std::lock_guard
l(agent_lock
);
411 void agent_inc_high_count() {
412 std::lock_guard
l(agent_lock
);
413 flush_mode_high_count
++;
416 void agent_dec_high_count() {
417 std::lock_guard
l(agent_lock
);
418 flush_mode_high_count
--;
422 /// throttle promotion attempts
423 std::atomic
<unsigned int> promote_probability_millis
{1000}; ///< probability thousands. one word.
424 PromoteCounter promote_counter
;
425 utime_t last_recalibrate
;
426 unsigned long promote_max_objects
, promote_max_bytes
;
429 bool promote_throttle() {
430 // NOTE: lockless! we rely on the probability being a single word.
431 promote_counter
.attempt();
432 if ((unsigned)rand() % 1000 > promote_probability_millis
)
433 return true; // yes throttle (no promote)
434 if (promote_max_objects
&&
435 promote_counter
.objects
> promote_max_objects
)
436 return true; // yes throttle
437 if (promote_max_bytes
&&
438 promote_counter
.bytes
> promote_max_bytes
)
439 return true; // yes throttle
440 return false; // no throttle (promote)
442 void promote_finish(uint64_t bytes
) {
443 promote_counter
.finish(bytes
);
445 void promote_throttle_recalibrate();
446 unsigned get_num_shards() const {
447 return m_objecter_finishers
;
449 Finisher
* get_objecter_finisher(int shard
) {
450 return objecter_finishers
[shard
].get();
453 // -- Objecter, for tiering reads/writes from/to other OSDs --
454 ceph::async::io_context_pool
& poolctx
;
455 std::unique_ptr
<Objecter
> objecter
;
456 int m_objecter_finishers
;
457 std::vector
<std::unique_ptr
<Finisher
>> objecter_finishers
;
460 ceph::mutex watch_lock
= ceph::make_mutex("OSDService::watch_lock");
461 SafeTimer watch_timer
;
462 uint64_t next_notif_id
;
463 uint64_t get_next_id(epoch_t cur_epoch
) {
464 std::lock_guard
l(watch_lock
);
465 return (((uint64_t)cur_epoch
) << 32) | ((uint64_t)(next_notif_id
++));
468 // -- Recovery/Backfill Request Scheduling --
469 ceph::mutex recovery_request_lock
= ceph::make_mutex("OSDService::recovery_request_lock");
470 SafeTimer recovery_request_timer
;
472 // For async recovery sleep
473 bool recovery_needs_sleep
= true;
474 ceph::real_clock::time_point recovery_schedule_time
;
476 // For recovery & scrub & snap
477 ceph::mutex sleep_lock
= ceph::make_mutex("OSDService::sleep_lock");
478 SafeTimer sleep_timer
;
482 std::atomic
<unsigned int> last_tid
{0};
483 ceph_tid_t
get_tid() {
484 return (ceph_tid_t
)last_tid
++;
487 // -- backfill_reservation --
488 Finisher reserver_finisher
;
489 AsyncReserver
<spg_t
, Finisher
> local_reserver
;
490 AsyncReserver
<spg_t
, Finisher
> remote_reserver
;
493 ceph::mutex merge_lock
= ceph::make_mutex("OSD::merge_lock");
494 std::map
<pg_t
,eversion_t
> ready_to_merge_source
; // pg -> version
495 std::map
<pg_t
,std::tuple
<eversion_t
,epoch_t
,epoch_t
>> ready_to_merge_target
; // pg -> (version,les,lec)
496 std::set
<pg_t
> not_ready_to_merge_source
;
497 std::map
<pg_t
,pg_t
> not_ready_to_merge_target
;
498 std::set
<pg_t
> sent_ready_to_merge_source
;
500 void set_ready_to_merge_source(PG
*pg
,
502 void set_ready_to_merge_target(PG
*pg
,
504 epoch_t last_epoch_started
,
505 epoch_t last_epoch_clean
);
506 void set_not_ready_to_merge_source(pg_t source
);
507 void set_not_ready_to_merge_target(pg_t target
, pg_t source
);
508 void clear_ready_to_merge(PG
*pg
);
509 void send_ready_to_merge();
510 void _send_ready_to_merge();
511 void clear_sent_ready_to_merge();
512 void prune_sent_ready_to_merge(const OSDMapRef
& osdmap
);
516 ceph::mutex pg_temp_lock
= ceph::make_mutex("OSDService::pg_temp_lock");
518 std::vector
<int> acting
;
521 std::map
<pg_t
, pg_temp_t
> pg_temp_wanted
;
522 std::map
<pg_t
, pg_temp_t
> pg_temp_pending
;
523 void _sent_pg_temp();
524 friend std::ostream
& operator<<(std::ostream
&, const pg_temp_t
&);
526 void queue_want_pg_temp(pg_t pgid
, const std::vector
<int>& want
,
527 bool forced
= false);
528 void remove_want_pg_temp(pg_t pgid
);
529 void requeue_pg_temp();
532 ceph::mutex pg_created_lock
= ceph::make_mutex("OSDService::pg_created_lock");
533 std::set
<pg_t
> pg_created
;
534 void send_pg_created(pg_t pgid
);
535 void prune_pg_created();
536 void send_pg_created();
538 AsyncReserver
<spg_t
, Finisher
> snap_reserver
;
539 void queue_recovery_context(PG
*pg
, GenContext
<ThreadPool::TPHandle
&> *c
);
540 void queue_for_snap_trim(PG
*pg
);
541 void queue_for_scrub(PG
* pg
, Scrub::scrub_prio_t with_priority
);
543 void queue_scrub_after_repair(PG
* pg
, Scrub::scrub_prio_t with_priority
);
545 /// queue the message (-> event) that all replicas have reserved scrub resources for us
546 void queue_for_scrub_granted(PG
* pg
, Scrub::scrub_prio_t with_priority
);
548 /// queue the message (-> event) that some replicas denied our scrub resources request
549 void queue_for_scrub_denied(PG
* pg
, Scrub::scrub_prio_t with_priority
);
551 /// Signals either (a) the end of a sleep period, or (b) a recheck of the availability
552 /// of the primary map being created by the backend.
553 void queue_for_scrub_resched(PG
* pg
, Scrub::scrub_prio_t with_priority
);
555 /// Signals a change in the number of in-flight recovery writes
556 void queue_scrub_pushes_update(PG
* pg
, Scrub::scrub_prio_t with_priority
);
558 /// Signals that all pending updates were applied
559 void queue_scrub_applied_update(PG
* pg
, Scrub::scrub_prio_t with_priority
);
561 /// Signals that the selected chunk (objects range) is available for scrubbing
562 void queue_scrub_chunk_free(PG
* pg
, Scrub::scrub_prio_t with_priority
);
564 /// The chunk selected is blocked by user operations, and cannot be scrubbed now
565 void queue_scrub_chunk_busy(PG
* pg
, Scrub::scrub_prio_t with_priority
);
567 /// The block-range that was locked and prevented the scrubbing - is freed
568 void queue_scrub_unblocking(PG
* pg
, Scrub::scrub_prio_t with_priority
);
570 /// Signals that all write OPs are done
571 void queue_scrub_digest_update(PG
* pg
, Scrub::scrub_prio_t with_priority
);
573 /// Signals that the the local (Primary's) scrub map is ready
574 void queue_scrub_got_local_map(PG
* pg
, Scrub::scrub_prio_t with_priority
);
576 /// Signals that we (the Primary) got all waited-for scrub-maps from our replicas
577 void queue_scrub_got_repl_maps(PG
* pg
, Scrub::scrub_prio_t with_priority
);
579 /// Signals that all chunks were handled
580 /// Note: always with high priority, as must be acted upon before the
581 /// next scrub request arrives from the Primary (and the primary is free
582 /// to send the request once the replica's map is received).
583 void queue_scrub_is_finished(PG
* pg
);
585 /// Signals that there are more chunks to handle
586 void queue_scrub_next_chunk(PG
* pg
, Scrub::scrub_prio_t with_priority
);
588 /// Signals that we have finished comparing the maps for this chunk
589 /// Note: required, as in Crimson this operation is 'futurized'.
590 void queue_scrub_maps_compared(PG
* pg
, Scrub::scrub_prio_t with_priority
);
592 void queue_for_rep_scrub(PG
* pg
,
593 Scrub::scrub_prio_t with_high_priority
,
594 unsigned int qu_priority
,
595 Scrub::act_token_t act_token
);
597 /// Signals a change in the number of in-flight recovery writes
598 void queue_scrub_replica_pushes(PG
*pg
, Scrub::scrub_prio_t with_priority
);
600 /// (not in Crimson) Queue a SchedReplica event to be sent to the replica, to
601 /// trigger a re-check of the availability of the scrub map prepared by the
603 void queue_for_rep_scrub_resched(PG
* pg
,
604 Scrub::scrub_prio_t with_high_priority
,
605 unsigned int qu_priority
,
606 Scrub::act_token_t act_token
);
608 void queue_for_pg_delete(spg_t pgid
, epoch_t e
);
609 bool try_finish_pg_delete(PG
*pg
, unsigned old_pg_num
);
612 // -- pg recovery and associated throttling --
613 ceph::mutex recovery_lock
= ceph::make_mutex("OSDService::recovery_lock");
614 std::list
<std::pair
<epoch_t
, PGRef
> > awaiting_throttle
;
616 /// queue a scrub-related message for a PG
617 template <class MSG_TYPE
>
618 void queue_scrub_event_msg(PG
* pg
,
619 Scrub::scrub_prio_t with_priority
,
620 unsigned int qu_priority
,
621 Scrub::act_token_t act_token
);
623 /// An alternative version of queue_scrub_event_msg(), in which the queuing priority is
624 /// provided by the executing scrub (i.e. taken from PgScrubber::m_flags)
625 template <class MSG_TYPE
>
626 void queue_scrub_event_msg(PG
* pg
, Scrub::scrub_prio_t with_priority
);
628 utime_t defer_recovery_until
;
629 uint64_t recovery_ops_active
;
630 uint64_t recovery_ops_reserved
;
631 bool recovery_paused
;
632 #ifdef DEBUG_RECOVERY_OIDS
633 std::map
<spg_t
, std::set
<hobject_t
> > recovery_oids
;
635 bool _recover_now(uint64_t *available_pushes
);
636 void _maybe_queue_recovery();
637 void _queue_for_recovery(
638 std::pair
<epoch_t
, PGRef
> p
, uint64_t reserved_pushes
);
640 void start_recovery_op(PG
*pg
, const hobject_t
& soid
);
641 void finish_recovery_op(PG
*pg
, const hobject_t
& soid
, bool dequeue
);
642 bool is_recovery_active();
643 void release_reserved_pushes(uint64_t pushes
);
644 void defer_recovery(float defer_for
) {
645 defer_recovery_until
= ceph_clock_now();
646 defer_recovery_until
+= defer_for
;
648 void pause_recovery() {
649 std::lock_guard
l(recovery_lock
);
650 recovery_paused
= true;
652 bool recovery_is_paused() {
653 std::lock_guard
l(recovery_lock
);
654 return recovery_paused
;
656 void unpause_recovery() {
657 std::lock_guard
l(recovery_lock
);
658 recovery_paused
= false;
659 _maybe_queue_recovery();
661 void kick_recovery_queue() {
662 std::lock_guard
l(recovery_lock
);
663 _maybe_queue_recovery();
665 void clear_queued_recovery(PG
*pg
) {
666 std::lock_guard
l(recovery_lock
);
667 awaiting_throttle
.remove_if(
668 [pg
](decltype(awaiting_throttle
)::const_reference awaiting
) {
669 return awaiting
.second
.get() == pg
;
673 unsigned get_target_pg_log_entries() const;
675 // delayed pg activation
676 void queue_for_recovery(PG
*pg
) {
677 std::lock_guard
l(recovery_lock
);
679 if (pg
->is_forced_recovery_or_backfill()) {
680 awaiting_throttle
.push_front(std::make_pair(pg
->get_osdmap()->get_epoch(), pg
));
682 awaiting_throttle
.push_back(std::make_pair(pg
->get_osdmap()->get_epoch(), pg
));
684 _maybe_queue_recovery();
686 void queue_recovery_after_sleep(PG
*pg
, epoch_t queued
, uint64_t reserved_pushes
) {
687 std::lock_guard
l(recovery_lock
);
688 _queue_for_recovery(std::make_pair(queued
, pg
), reserved_pushes
);
691 void queue_check_readable(spg_t spgid
,
693 ceph::signedspan delay
= ceph::signedspan::zero());
695 // osd map cache (past osd maps)
696 ceph::mutex map_cache_lock
= ceph::make_mutex("OSDService::map_cache_lock");
697 SharedLRU
<epoch_t
, const OSDMap
> map_cache
;
698 SimpleLRU
<epoch_t
, ceph::buffer::list
> map_bl_cache
;
699 SimpleLRU
<epoch_t
, ceph::buffer::list
> map_bl_inc_cache
;
701 OSDMapRef
try_get_map(epoch_t e
);
702 OSDMapRef
get_map(epoch_t e
) {
703 OSDMapRef
ret(try_get_map(e
));
707 OSDMapRef
add_map(OSDMap
*o
) {
708 std::lock_guard
l(map_cache_lock
);
711 OSDMapRef
_add_map(OSDMap
*o
);
713 void _add_map_bl(epoch_t e
, ceph::buffer::list
& bl
);
714 bool get_map_bl(epoch_t e
, ceph::buffer::list
& bl
) {
715 std::lock_guard
l(map_cache_lock
);
716 return _get_map_bl(e
, bl
);
718 bool _get_map_bl(epoch_t e
, ceph::buffer::list
& bl
);
720 void _add_map_inc_bl(epoch_t e
, ceph::buffer::list
& bl
);
721 bool get_inc_map_bl(epoch_t e
, ceph::buffer::list
& bl
);
723 /// identify split child pgids over a osdmap interval
724 void identify_splits_and_merges(
728 std::set
<std::pair
<spg_t
,epoch_t
>> *new_children
,
729 std::set
<std::pair
<spg_t
,epoch_t
>> *merge_pgs
);
731 void need_heartbeat_peer_update();
735 void start_shutdown();
736 void shutdown_reserver();
740 ceph::mutex stat_lock
= ceph::make_mutex("OSDService::stat_lock");
744 void set_statfs(const struct store_statfs_t
&stbuf
,
745 osd_alert_list_t
& alerts
);
746 osd_stat_t
set_osd_stat(std::vector
<int>& hb_peers
, int num_pgs
);
747 void inc_osd_stat_repaired(void);
748 float compute_adjusted_ratio(osd_stat_t new_stat
, float *pratio
, uint64_t adjust_used
= 0);
749 osd_stat_t
get_osd_stat() {
750 std::lock_guard
l(stat_lock
);
752 osd_stat
.up_from
= up_epoch
;
753 osd_stat
.seq
= ((uint64_t)osd_stat
.up_from
<< 32) + seq
;
756 uint64_t get_osd_stat_seq() {
757 std::lock_guard
l(stat_lock
);
760 void get_hb_pingtime(std::map
<int, osd_stat_t::Interfaces
> *pp
)
762 std::lock_guard
l(stat_lock
);
763 *pp
= osd_stat
.hb_pingtime
;
767 // -- OSD Full Status --
769 friend TestOpsSocketHook
;
770 mutable ceph::mutex full_status_lock
= ceph::make_mutex("OSDService::full_status_lock");
771 enum s_names
{ INVALID
= -1, NONE
, NEARFULL
, BACKFILLFULL
, FULL
, FAILSAFE
} cur_state
; // ascending
772 const char *get_full_state_name(s_names s
) const {
774 case NONE
: return "none";
775 case NEARFULL
: return "nearfull";
776 case BACKFILLFULL
: return "backfillfull";
777 case FULL
: return "full";
778 case FAILSAFE
: return "failsafe";
779 default: return "???";
782 s_names
get_full_state(std::string type
) const {
785 else if (type
== "failsafe")
787 else if (type
== "full")
789 else if (type
== "backfillfull")
791 else if (type
== "nearfull")
796 double cur_ratio
, physical_ratio
; ///< current utilization
797 mutable int64_t injectfull
= 0;
798 s_names injectfull_state
= NONE
;
799 float get_failsafe_full_ratio();
800 bool _check_inject_full(DoutPrefixProvider
*dpp
, s_names type
) const;
801 bool _check_full(DoutPrefixProvider
*dpp
, s_names type
) const;
803 void check_full_status(float ratio
, float pratio
);
804 s_names
recalc_full_state(float ratio
, float pratio
, std::string
&inject
);
805 bool _tentative_full(DoutPrefixProvider
*dpp
, s_names type
, uint64_t adjust_used
, osd_stat_t
);
806 bool check_failsafe_full(DoutPrefixProvider
*dpp
) const;
807 bool check_full(DoutPrefixProvider
*dpp
) const;
808 bool tentative_backfill_full(DoutPrefixProvider
*dpp
, uint64_t adjust_used
, osd_stat_t
);
809 bool check_backfill_full(DoutPrefixProvider
*dpp
) const;
810 bool check_nearfull(DoutPrefixProvider
*dpp
) const;
811 bool is_failsafe_full() const;
812 bool is_full() const;
813 bool is_backfillfull() const;
814 bool is_nearfull() const;
815 bool need_fullness_update(); ///< osdmap state needs update
816 void set_injectfull(s_names type
, int64_t count
);
821 // protects access to boot_epoch, up_epoch, bind_epoch
822 mutable ceph::mutex epoch_lock
= ceph::make_mutex("OSDService::epoch_lock");
823 epoch_t boot_epoch
; // _first_ epoch we were marked up (after this process started)
824 epoch_t up_epoch
; // _most_recent_ epoch we were marked up
825 epoch_t bind_epoch
; // epoch we last did a bind to new ip:ports
828 * Retrieve the boot_, up_, and bind_ epochs the OSD has std::set. The params
829 * can be NULL if you don't care about them.
831 void retrieve_epochs(epoch_t
*_boot_epoch
, epoch_t
*_up_epoch
,
832 epoch_t
*_bind_epoch
) const;
834 * Std::set the boot, up, and bind epochs. Any NULL params will not be std::set.
836 void set_epochs(const epoch_t
*_boot_epoch
, const epoch_t
*_up_epoch
,
837 const epoch_t
*_bind_epoch
);
838 epoch_t
get_boot_epoch() const {
840 retrieve_epochs(&ret
, NULL
, NULL
);
843 epoch_t
get_up_epoch() const {
845 retrieve_epochs(NULL
, &ret
, NULL
);
848 epoch_t
get_bind_epoch() const {
850 retrieve_epochs(NULL
, NULL
, &ret
);
854 void request_osdmap_update(epoch_t e
);
857 ceph::mutex hb_stamp_lock
= ceph::make_mutex("OSDServce::hb_stamp_lock");
859 /// osd -> heartbeat stamps
860 std::vector
<HeartbeatStampsRef
> hb_stamps
;
862 /// get or create a ref for a peer's HeartbeatStamps
863 HeartbeatStampsRef
get_hb_stamps(unsigned osd
);
866 // Timer for readable leases
867 ceph::timer
<ceph::mono_clock
> mono_timer
= ceph::timer
<ceph::mono_clock
>{ceph::construct_suspended
};
869 void queue_renew_lease(epoch_t epoch
, spg_t spgid
);
872 ceph::mutex is_stopping_lock
= ceph::make_mutex("OSDService::is_stopping_lock");
873 ceph::condition_variable is_stopping_cond
;
878 std::atomic
<int> state
{NOT_STOPPING
};
879 int get_state() const {
882 void set_state(int s
) {
885 bool is_stopping() const {
886 return state
== STOPPING
;
888 bool is_preparing_to_stop() const {
889 return state
== PREPARING_TO_STOP
;
891 bool prepare_to_stop();
896 ceph::mutex pgid_lock
= ceph::make_mutex("OSDService::pgid_lock");
897 std::map
<spg_t
, int> pgid_tracker
;
898 std::map
<spg_t
, PG
*> live_pgs
;
899 void add_pgid(spg_t pgid
, PG
*pg
);
900 void remove_pgid(spg_t pgid
, PG
*pg
);
901 void dump_live_pgids();
904 explicit OSDService(OSD
*osd
, ceph::async::io_context_pool
& poolctx
);
905 ~OSDService() = default;
910 Each PG slot includes queues for events that are processing and/or waiting
911 for a PG to be materialized in the slot.
913 These are the constraints:
915 - client ops must remained ordered by client, regardless of std::map epoch
916 - peering messages/events from peers must remain ordered by peer
917 - peering messages and client ops need not be ordered relative to each other
919 - some peering events can create a pg (e.g., notify)
920 - the query peering event can proceed when a PG doesn't exist
922 Implementation notes:
924 - everybody waits for split. If the OSD has the parent PG it will instantiate
925 the PGSlot early and mark it waiting_for_split. Everything will wait until
926 the parent is able to commit the split operation and the child PG's are
927 materialized in the child slots.
929 - every event has an epoch property and will wait for the OSDShard to catch
930 up to that epoch. For example, if we get a peering event from a future
931 epoch, the event will wait in the slot until the local OSD has caught up.
932 (We should be judicious in specifying the required epoch [by, e.g., setting
933 it to the same_interval_since epoch] so that we don't wait for epochs that
934 don't affect the given PG.)
936 - we maintain two separate wait lists, *waiting* and *waiting_peering*. The
937 OpSchedulerItem has an is_peering() bool to determine which we use. Waiting
938 peering events are queued up by epoch required.
940 - when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or
941 materialized the PG), we wake *all* waiting items. (This could be optimized,
942 probably, but we don't bother.) We always requeue peering items ahead of
945 - some peering events are marked !peering_requires_pg (PGQuery). if we do
946 not have a PG these are processed immediately (under the shard lock).
948 - we do not have a PG present, we check if the slot maps to the current host.
949 if so, we either queue the item and wait for the PG to materialize, or
950 (if the event is a pg creating event like PGNotify), we materialize the PG.
952 - when we advance the osdmap on the OSDShard, we scan pg slots and
953 discard any slots with no pg (and not waiting_for_split) that no
954 longer std::map to the current host.
958 struct OSDShardPGSlot
{
959 using OpSchedulerItem
= ceph::osd::scheduler::OpSchedulerItem
;
960 PGRef pg
; ///< pg reference
961 std::deque
<OpSchedulerItem
> to_process
; ///< order items for this slot
962 int num_running
= 0; ///< _process threads doing pg lookup/lock
964 std::deque
<OpSchedulerItem
> waiting
; ///< waiting for pg (or map + pg)
966 /// waiting for map (peering evt)
967 std::map
<epoch_t
,std::deque
<OpSchedulerItem
>> waiting_peering
;
969 /// incremented by wake_pg_waiters; indicates racing _process threads
970 /// should bail out (their op has been requeued)
971 uint64_t requeue_seq
= 0;
973 /// waiting for split child to materialize in these epoch(s)
974 std::set
<epoch_t
> waiting_for_split
;
977 boost::intrusive::set_member_hook
<> pg_epoch_item
;
979 /// waiting for a merge (source or target) by this epoch
980 epoch_t waiting_for_merge_epoch
= 0;
984 const unsigned shard_id
;
988 std::string shard_name
;
990 std::string sdata_wait_lock_name
;
991 ceph::mutex sdata_wait_lock
;
992 ceph::condition_variable sdata_cond
;
993 int waiting_threads
= 0;
995 ceph::mutex osdmap_lock
; ///< protect shard_osdmap updates vs users w/o shard_lock
996 OSDMapRef shard_osdmap
;
998 OSDMapRef
get_osdmap() {
999 std::lock_guard
l(osdmap_lock
);
1000 return shard_osdmap
;
1003 std::string shard_lock_name
;
1004 ceph::mutex shard_lock
; ///< protects remaining members below
1006 /// map of slots for each spg_t. maintains ordering of items dequeued
1007 /// from scheduler while _process thread drops shard lock to acquire the
1008 /// pg lock. stale slots are removed by consume_map.
1009 std::unordered_map
<spg_t
,std::unique_ptr
<OSDShardPGSlot
>> pg_slots
;
1011 struct pg_slot_compare_by_epoch
{
1012 bool operator()(const OSDShardPGSlot
& l
, const OSDShardPGSlot
& r
) const {
1013 return l
.epoch
< r
.epoch
;
1017 /// maintain an ordering of pg slots by pg epoch
1018 boost::intrusive::multiset
<
1020 boost::intrusive::member_hook
<
1022 boost::intrusive::set_member_hook
<>,
1023 &OSDShardPGSlot::pg_epoch_item
>,
1024 boost::intrusive::compare
<pg_slot_compare_by_epoch
>> pg_slots_by_epoch
;
1025 int waiting_for_min_pg_epoch
= 0;
1026 ceph::condition_variable min_pg_epoch_cond
;
1029 ceph::osd::scheduler::OpSchedulerRef scheduler
;
1031 bool stop_waiting
= false;
1033 ContextQueue context_queue
;
1035 void _attach_pg(OSDShardPGSlot
*slot
, PG
*pg
);
1036 void _detach_pg(OSDShardPGSlot
*slot
);
1038 void update_pg_epoch(OSDShardPGSlot
*slot
, epoch_t epoch
);
1039 epoch_t
get_min_pg_epoch();
1040 void wait_min_pg_epoch(epoch_t need
);
1042 /// return newest epoch we are waiting for
1043 epoch_t
get_max_waiting_epoch();
1045 /// push osdmap into shard
1047 const OSDMapRef
& osdmap
,
1048 unsigned *pushes_to_free
);
1050 int _wake_pg_slot(spg_t pgid
, OSDShardPGSlot
*slot
);
1052 void identify_splits_and_merges(
1053 const OSDMapRef
& as_of_osdmap
,
1054 std::set
<std::pair
<spg_t
,epoch_t
>> *split_children
,
1055 std::set
<std::pair
<spg_t
,epoch_t
>> *merge_pgs
);
1056 void _prime_splits(std::set
<std::pair
<spg_t
,epoch_t
>> *pgids
);
1057 void prime_splits(const OSDMapRef
& as_of_osdmap
,
1058 std::set
<std::pair
<spg_t
,epoch_t
>> *pgids
);
1059 void prime_merges(const OSDMapRef
& as_of_osdmap
,
1060 std::set
<std::pair
<spg_t
,epoch_t
>> *merge_pgs
);
1061 void register_and_wake_split_child(PG
*pg
);
1062 void unprime_split_children(spg_t parent
, unsigned old_pg_num
);
1063 void update_scheduler_config();
1064 std::string
get_scheduler_type();
1072 class OSD
: public Dispatcher
,
1073 public md_config_obs_t
{
1074 using OpSchedulerItem
= ceph::osd::scheduler::OpSchedulerItem
;
1078 ceph::mutex osd_lock
= ceph::make_mutex("OSD::osd_lock");
1079 SafeTimer tick_timer
; // safe timer (osd_lock)
1081 // Tick timer for those stuff that do not need osd_lock
1082 ceph::mutex tick_timer_lock
= ceph::make_mutex("OSD::tick_timer_lock");
1083 SafeTimer tick_timer_without_osd_lock
;
1084 std::string gss_ktfile_client
{};
1087 // config observer bits
1088 const char** get_tracked_conf_keys() const override
;
1089 void handle_conf_change(const ConfigProxy
& conf
,
1090 const std::set
<std::string
> &changed
) override
;
1091 void update_log_config();
1092 void check_config();
1096 const double OSD_TICK_INTERVAL
= { 1.0 };
1097 double get_tick_interval() const;
1099 Messenger
*cluster_messenger
;
1100 Messenger
*client_messenger
;
1101 Messenger
*objecter_messenger
;
1102 MonClient
*monc
; // check the "monc helpers" list before accessing directly
1104 PerfCounters
*logger
;
1105 PerfCounters
*recoverystate_perf
;
1106 std::unique_ptr
<ObjectStore
> store
;
1108 FuseStore
*fuse_store
= nullptr;
1110 LogClient log_client
;
1114 std::string dev_path
, journal_path
;
1116 ceph_release_t last_require_osd_release
{ceph_release_t::unknown
};
1119 size_t numa_cpu_set_size
= 0;
1120 cpu_set_t numa_cpu_set
;
1122 bool store_is_rotational
= true;
1123 bool journal_is_rotational
= true;
1125 ZTracer::Endpoint trace_endpoint
;
1126 PerfCounters
* create_logger();
1127 PerfCounters
* create_recoverystate_perf();
1129 void tick_without_osd_lock();
1130 void _dispatch(Message
*m
);
1131 void dispatch_op(OpRequestRef op
);
1133 void check_osdmap_features();
1136 friend class OSDSocketHook
;
1137 class OSDSocketHook
*asok_hook
;
1138 using PGRefOrError
= std::tuple
<std::optional
<PGRef
>, int>;
1139 PGRefOrError
locate_asok_target(const cmdmap_t
& cmdmap
,
1140 std::stringstream
& ss
, bool only_primary
);
1141 int asok_route_to_pg(bool only_primary
,
1142 std::string_view prefix
,
1145 std::stringstream
& ss
,
1146 const bufferlist
& inbl
,
1148 std::function
<void(int, const std::string
&, bufferlist
&)> on_finish
);
1150 std::string_view prefix
,
1151 const cmdmap_t
& cmdmap
,
1153 const ceph::buffer::list
& inbl
,
1154 std::function
<void(int,const std::string
&,ceph::buffer::list
&)> on_finish
);
1157 int get_nodeid() { return whoami
; }
1159 static ghobject_t
get_osdmap_pobject_name(epoch_t epoch
) {
1161 snprintf(foo
, sizeof(foo
), "osdmap.%d", epoch
);
1162 return ghobject_t(hobject_t(sobject_t(object_t(foo
), 0)));
1164 static ghobject_t
get_inc_osdmap_pobject_name(epoch_t epoch
) {
1166 snprintf(foo
, sizeof(foo
), "inc_osdmap.%d", epoch
);
1167 return ghobject_t(hobject_t(sobject_t(object_t(foo
), 0)));
1170 static ghobject_t
make_snapmapper_oid() {
1171 return ghobject_t(hobject_t(
1173 object_t("snapmapper"),
1176 static ghobject_t
make_purged_snaps_oid() {
1177 return ghobject_t(hobject_t(
1179 object_t("purged_snaps"),
1183 static ghobject_t
make_pg_log_oid(spg_t pg
) {
1184 std::stringstream ss
;
1185 ss
<< "pglog_" << pg
;
1188 return ghobject_t(hobject_t(sobject_t(object_t(s
.c_str()), 0)));
1191 static ghobject_t
make_pg_biginfo_oid(spg_t pg
) {
1192 std::stringstream ss
;
1193 ss
<< "pginfo_" << pg
;
1196 return ghobject_t(hobject_t(sobject_t(object_t(s
.c_str()), 0)));
1198 static ghobject_t
make_infos_oid() {
1199 hobject_t
oid(sobject_t("infos", CEPH_NOSNAP
));
1200 return ghobject_t(oid
);
1203 static ghobject_t
make_final_pool_info_oid(int64_t pool
) {
1207 object_t(std::string("final_pool_") + stringify(pool
)),
1211 static ghobject_t
make_pg_num_history_oid() {
1212 return ghobject_t(hobject_t(sobject_t("pg_num_history", CEPH_NOSNAP
)));
1215 static void recursive_remove_collection(CephContext
* cct
,
1221 * get_osd_initial_compat_set()
1223 * Get the initial feature std::set for this OSD. Features
1224 * here are automatically upgraded.
1226 * Return value: Initial osd CompatSet
1228 static CompatSet
get_osd_initial_compat_set();
1231 * get_osd_compat_set()
1233 * Get all features supported by this OSD
1235 * Return value: CompatSet of all supported features
1237 static CompatSet
get_osd_compat_set();
1242 class C_Tick_WithoutOSDLock
;
1244 // -- config settings --
1245 float m_osd_pg_epoch_max_lag_factor
;
1248 OSDSuperblock superblock
;
1250 void write_superblock();
1251 void write_superblock(ObjectStore::Transaction
& t
);
1252 int read_superblock();
1254 void clear_temp_objects();
1256 CompatSet osd_compat
;
1261 STATE_INITIALIZING
= 1,
1266 STATE_WAITING_FOR_HEALTHY
1269 static const char *get_state_name(int s
) {
1271 case STATE_INITIALIZING
: return "initializing";
1272 case STATE_PREBOOT
: return "preboot";
1273 case STATE_BOOTING
: return "booting";
1274 case STATE_ACTIVE
: return "active";
1275 case STATE_STOPPING
: return "stopping";
1276 case STATE_WAITING_FOR_HEALTHY
: return "waiting_for_healthy";
1277 default: return "???";
1282 std::atomic
<int> state
{STATE_INITIALIZING
};
1285 int get_state() const {
1288 void set_state(int s
) {
1291 bool is_initializing() const {
1292 return state
== STATE_INITIALIZING
;
1294 bool is_preboot() const {
1295 return state
== STATE_PREBOOT
;
1297 bool is_booting() const {
1298 return state
== STATE_BOOTING
;
1300 bool is_active() const {
1301 return state
== STATE_ACTIVE
;
1303 bool is_stopping() const {
1304 return state
== STATE_STOPPING
;
1306 bool is_waiting_for_healthy() const {
1307 return state
== STATE_WAITING_FOR_HEALTHY
;
1312 ShardedThreadPool osd_op_tp
;
1314 void get_latest_osdmap();
1318 void dispatch_session_waiting(const ceph::ref_t
<Session
>& session
, OSDMapRef osdmap
);
1320 ceph::mutex session_waiting_lock
= ceph::make_mutex("OSD::session_waiting_lock");
1321 std::set
<ceph::ref_t
<Session
>> session_waiting_for_map
;
1323 /// Caller assumes refs for included Sessions
1324 void get_sessions_waiting_for_map(std::set
<ceph::ref_t
<Session
>> *out
) {
1325 std::lock_guard
l(session_waiting_lock
);
1326 out
->swap(session_waiting_for_map
);
1328 void register_session_waiting_on_map(const ceph::ref_t
<Session
>& session
) {
1329 std::lock_guard
l(session_waiting_lock
);
1330 session_waiting_for_map
.insert(session
);
1332 void clear_session_waiting_on_map(const ceph::ref_t
<Session
>& session
) {
1333 std::lock_guard
l(session_waiting_lock
);
1334 session_waiting_for_map
.erase(session
);
1336 void dispatch_sessions_waiting_on_map() {
1337 std::set
<ceph::ref_t
<Session
>> sessions_to_check
;
1338 get_sessions_waiting_for_map(&sessions_to_check
);
1339 for (auto i
= sessions_to_check
.begin();
1340 i
!= sessions_to_check
.end();
1341 sessions_to_check
.erase(i
++)) {
1342 std::lock_guard l
{(*i
)->session_dispatch_lock
};
1343 dispatch_session_waiting(*i
, get_osdmap());
1346 void session_handle_reset(const ceph::ref_t
<Session
>& session
) {
1347 std::lock_guard
l(session
->session_dispatch_lock
);
1348 clear_session_waiting_on_map(session
);
1350 session
->clear_backoffs();
1352 /* Messages have connection refs, we need to clear the
1353 * connection->session->message->connection
1354 * cycles which result.
1357 session
->waiting_on_map
.clear_and_dispose(TrackedOp::Putter());
1362 * @defgroup monc helpers
1364 * Right now we only have the one
1368 * Ask the Monitors for a sequence of OSDMaps.
1370 * @param epoch The epoch to start with when replying
1371 * @param force_request True if this request forces a new subscription to
1372 * the monitors; false if an outstanding request that encompasses it is
1375 void osdmap_subscribe(version_t epoch
, bool force_request
);
1376 /** @} monc helpers */
1378 ceph::mutex osdmap_subscribe_lock
= ceph::make_mutex("OSD::osdmap_subscribe_lock");
1379 epoch_t latest_subscribed_epoch
{0};
1382 /// information about a heartbeat peer
1383 struct HeartbeatInfo
{
1385 ConnectionRef con_front
; ///< peer connection (front)
1386 ConnectionRef con_back
; ///< peer connection (back)
1387 utime_t first_tx
; ///< time we sent our first ping request
1388 utime_t last_tx
; ///< last time we sent a ping request
1389 utime_t last_rx_front
; ///< last time we got a ping reply on the front side
1390 utime_t last_rx_back
; ///< last time we got a ping reply on the back side
1391 epoch_t epoch
; ///< most recent epoch we wanted this peer
1392 /// number of connections we send and receive heartbeat pings/replies
1393 static constexpr int HEARTBEAT_MAX_CONN
= 2;
1394 /// history of inflight pings, arranging by timestamp we sent
1395 /// send time -> deadline -> remaining replies
1396 std::map
<utime_t
, std::pair
<utime_t
, int>> ping_history
;
1398 utime_t hb_interval_start
;
1399 uint32_t hb_average_count
= 0;
1400 uint32_t hb_index
= 0;
1402 uint32_t hb_total_back
= 0;
1403 uint32_t hb_min_back
= UINT_MAX
;
1404 uint32_t hb_max_back
= 0;
1405 std::vector
<uint32_t> hb_back_pingtime
;
1406 std::vector
<uint32_t> hb_back_min
;
1407 std::vector
<uint32_t> hb_back_max
;
1409 uint32_t hb_total_front
= 0;
1410 uint32_t hb_min_front
= UINT_MAX
;
1411 uint32_t hb_max_front
= 0;
1412 std::vector
<uint32_t> hb_front_pingtime
;
1413 std::vector
<uint32_t> hb_front_min
;
1414 std::vector
<uint32_t> hb_front_max
;
1416 bool is_stale(utime_t stale
) const {
1417 if (ping_history
.empty()) {
1420 utime_t oldest_deadline
= ping_history
.begin()->second
.first
;
1421 return oldest_deadline
<= stale
;
1424 bool is_unhealthy(utime_t now
) const {
1425 if (ping_history
.empty()) {
1426 /// we haven't sent a ping yet or we have got all replies,
1427 /// in either way we are safe and healthy for now
1431 utime_t oldest_deadline
= ping_history
.begin()->second
.first
;
1432 return now
> oldest_deadline
;
1435 bool is_healthy(utime_t now
) const {
1436 if (last_rx_front
== utime_t() || last_rx_back
== utime_t()) {
1437 // only declare to be healthy until we have received the first
1438 // replies from both front/back connections
1441 return !is_unhealthy(now
);
1444 void clear_mark_down(Connection
*except
= nullptr) {
1445 if (con_back
&& con_back
!= except
) {
1446 con_back
->mark_down();
1447 con_back
->clear_priv();
1448 con_back
.reset(nullptr);
1450 if (con_front
&& con_front
!= except
) {
1451 con_front
->mark_down();
1452 con_front
->clear_priv();
1453 con_front
.reset(nullptr);
1458 ceph::mutex heartbeat_lock
= ceph::make_mutex("OSD::heartbeat_lock");
1459 std::map
<int, int> debug_heartbeat_drops_remaining
;
1460 ceph::condition_variable heartbeat_cond
;
1461 bool heartbeat_stop
;
1462 std::atomic
<bool> heartbeat_need_update
;
1463 std::map
<int,HeartbeatInfo
> heartbeat_peers
; ///< map of osd id to HeartbeatInfo
1464 utime_t last_mon_heartbeat
;
1465 Messenger
*hb_front_client_messenger
;
1466 Messenger
*hb_back_client_messenger
;
1467 Messenger
*hb_front_server_messenger
;
1468 Messenger
*hb_back_server_messenger
;
1469 utime_t last_heartbeat_resample
; ///< last time we chose random peers in waiting-for-healthy state
1470 double daily_loadavg
;
1471 ceph::mono_time startup_time
;
1473 // Track ping repsonse times using vector as a circular buffer
1474 // MUST BE A POWER OF 2
1475 const uint32_t hb_vector_size
= 16;
1477 void _add_heartbeat_peer(int p
);
1478 void _remove_heartbeat_peer(int p
);
1479 bool heartbeat_reset(Connection
*con
);
1480 void maybe_update_heartbeat_peers();
1481 void reset_heartbeat_peers(bool all
);
1482 bool heartbeat_peers_need_update() {
1483 return heartbeat_need_update
.load();
1485 void heartbeat_set_peers_need_update() {
1486 heartbeat_need_update
.store(true);
1488 void heartbeat_clear_peers_need_update() {
1489 heartbeat_need_update
.store(false);
1492 void heartbeat_check();
1493 void heartbeat_entry();
1494 void need_heartbeat_peer_update();
1496 void heartbeat_kick() {
1497 std::lock_guard
l(heartbeat_lock
);
1498 heartbeat_cond
.notify_all();
1501 struct T_Heartbeat
: public Thread
{
1503 explicit T_Heartbeat(OSD
*o
) : osd(o
) {}
1504 void *entry() override
{
1505 osd
->heartbeat_entry();
1511 bool heartbeat_dispatch(Message
*m
);
1513 struct HeartbeatDispatcher
: public Dispatcher
{
1515 explicit HeartbeatDispatcher(OSD
*o
) : Dispatcher(o
->cct
), osd(o
) {}
1517 bool ms_can_fast_dispatch_any() const override
{ return true; }
1518 bool ms_can_fast_dispatch(const Message
*m
) const override
{
1519 switch (m
->get_type()) {
1527 void ms_fast_dispatch(Message
*m
) override
{
1528 osd
->heartbeat_dispatch(m
);
1530 bool ms_dispatch(Message
*m
) override
{
1531 return osd
->heartbeat_dispatch(m
);
1533 bool ms_handle_reset(Connection
*con
) override
{
1534 return osd
->heartbeat_reset(con
);
1536 void ms_handle_remote_reset(Connection
*con
) override
{}
1537 bool ms_handle_refused(Connection
*con
) override
{
1538 return osd
->ms_handle_refused(con
);
1540 int ms_handle_authentication(Connection
*con
) override
{
1543 } heartbeat_dispatcher
;
1547 std::list
<OpRequestRef
> finished
;
1549 void take_waiters(std::list
<OpRequestRef
>& ls
) {
1550 ceph_assert(ceph_mutex_is_locked(osd_lock
));
1551 finished
.splice(finished
.end(), ls
);
1555 // -- op tracking --
1556 OpTracker op_tracker
;
1557 void test_ops(std::string command
, std::string args
, std::ostream
& ss
);
1558 friend class TestOpsSocketHook
;
1559 TestOpsSocketHook
*test_ops_hook
;
1560 friend struct C_FinishSplits
;
1561 friend struct C_OpenPGs
;
1566 * The ordered op delivery chain is:
1568 * fast dispatch -> scheduler back
1569 * scheduler front <-> to_process back
1570 * to_process front -> RunVis(item)
1573 * The scheduler is per-shard, and to_process is per pg_slot. Items can be
1574 * pushed back up into to_process and/or scheduler while order is preserved.
1576 * Multiple worker threads can operate on each shard.
1578 * Under normal circumstances, num_running == to_process.size(). There are
1579 * two times when that is not true: (1) when waiting_for_pg == true and
1580 * to_process is accumulating requests that are waiting for the pg to be
1581 * instantiated; in that case they will all get requeued together by
1582 * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
1583 * and already requeued the items.
1585 friend class ceph::osd::scheduler::PGOpItem
;
1586 friend class ceph::osd::scheduler::PGPeeringItem
;
1587 friend class ceph::osd::scheduler::PGRecovery
;
1588 friend class ceph::osd::scheduler::PGRecoveryMsg
;
1589 friend class ceph::osd::scheduler::PGDelete
;
1592 : public ShardedThreadPool::ShardedWQ
<OpSchedulerItem
>
1595 bool m_fast_shutdown
= false;
1600 ShardedThreadPool
* tp
)
1601 : ShardedThreadPool::ShardedWQ
<OpSchedulerItem
>(ti
, si
, tp
),
1605 void _add_slot_waiter(
1607 OSDShardPGSlot
*slot
,
1608 OpSchedulerItem
&& qi
);
1610 /// try to do some work
1611 void _process(uint32_t thread_index
, ceph::heartbeat_handle_d
*hb
) override
;
1613 void stop_for_fast_shutdown();
1615 /// enqueue a new item
1616 void _enqueue(OpSchedulerItem
&& item
) override
;
1618 /// requeue an old item (at the front of the line)
1619 void _enqueue_front(OpSchedulerItem
&& item
) override
;
1621 void return_waiting_threads() override
{
1622 for(uint32_t i
= 0; i
< osd
->num_shards
; i
++) {
1623 OSDShard
* sdata
= osd
->shards
[i
];
1624 assert (NULL
!= sdata
);
1625 std::scoped_lock l
{sdata
->sdata_wait_lock
};
1626 sdata
->stop_waiting
= true;
1627 sdata
->sdata_cond
.notify_all();
1631 void stop_return_waiting_threads() override
{
1632 for(uint32_t i
= 0; i
< osd
->num_shards
; i
++) {
1633 OSDShard
* sdata
= osd
->shards
[i
];
1634 assert (NULL
!= sdata
);
1635 std::scoped_lock l
{sdata
->sdata_wait_lock
};
1636 sdata
->stop_waiting
= false;
1640 void dump(ceph::Formatter
*f
) {
1641 for(uint32_t i
= 0; i
< osd
->num_shards
; i
++) {
1642 auto &&sdata
= osd
->shards
[i
];
1644 char queue_name
[32] = {0};
1645 snprintf(queue_name
, sizeof(queue_name
), "%s%" PRIu32
, "OSD:ShardedOpWQ:", i
);
1646 ceph_assert(NULL
!= sdata
);
1648 std::scoped_lock l
{sdata
->shard_lock
};
1649 f
->open_object_section(queue_name
);
1650 sdata
->scheduler
->dump(*f
);
1655 bool is_shard_empty(uint32_t thread_index
) override
{
1656 uint32_t shard_index
= thread_index
% osd
->num_shards
;
1657 auto &&sdata
= osd
->shards
[shard_index
];
1659 std::lock_guard
l(sdata
->shard_lock
);
1660 if (thread_index
< osd
->num_shards
) {
1661 return sdata
->scheduler
->empty() && sdata
->context_queue
.empty();
1663 return sdata
->scheduler
->empty();
1667 void handle_oncommits(std::list
<Context
*>& oncommits
) {
1668 for (auto p
: oncommits
) {
1675 void enqueue_op(spg_t pg
, OpRequestRef
&& op
, epoch_t epoch
);
1677 PGRef pg
, OpRequestRef op
,
1678 ThreadPool::TPHandle
&handle
);
1680 void enqueue_peering_evt(
1682 PGPeeringEventRef ref
);
1683 void dequeue_peering_evt(
1686 PGPeeringEventRef ref
,
1687 ThreadPool::TPHandle
& handle
);
1689 void dequeue_delete(
1693 ThreadPool::TPHandle
& handle
);
1696 friend struct OSDShard
;
1697 friend class PrimaryLogPG
;
1698 friend class PgScrubber
;
1704 // TODO: switch to std::atomic<OSDMapRef> when C++20 will be available.
1706 void set_osdmap(OSDMapRef osdmap
) {
1707 std::atomic_store(&_osdmap
, osdmap
);
1709 OSDMapRef
get_osdmap() const {
1710 return std::atomic_load(&_osdmap
);
1712 epoch_t
get_osdmap_epoch() const {
1713 // XXX: performance?
1714 auto osdmap
= get_osdmap();
1715 return osdmap
? osdmap
->get_epoch() : 0;
1718 pool_pg_num_history_t pg_num_history
;
1720 ceph::shared_mutex map_lock
= ceph::make_shared_mutex("OSD::map_lock");
1721 std::list
<OpRequestRef
> waiting_for_osdmap
;
1722 std::deque
<utime_t
> osd_markdown_log
;
1724 friend struct send_map_on_destruct
;
1726 void wait_for_new_map(OpRequestRef op
);
1727 void handle_osd_map(class MOSDMap
*m
);
1728 void _committed_osd_maps(epoch_t first
, epoch_t last
, class MOSDMap
*m
);
1729 void trim_maps(epoch_t oldest
, int nreceived
, bool skip_maps
);
1730 void note_down_osd(int osd
);
1731 void note_up_osd(int osd
);
1732 friend struct C_OnMapCommit
;
1737 ThreadPool::TPHandle
&handle
,
1740 void activate_map();
1742 // osd map cache (past osd maps)
1743 OSDMapRef
get_map(epoch_t e
) {
1744 return service
.get_map(e
);
1746 OSDMapRef
add_map(OSDMap
*o
) {
1747 return service
.add_map(o
);
1749 bool get_map_bl(epoch_t e
, ceph::buffer::list
& bl
) {
1750 return service
.get_map_bl(e
, bl
);
1755 std::vector
<OSDShard
*> shards
;
1756 uint32_t num_shards
= 0;
1758 void inc_num_pgs() {
1761 void dec_num_pgs() {
1764 int get_num_pgs() const {
1769 ceph::mutex merge_lock
= ceph::make_mutex("OSD::merge_lock");
1770 /// merge epoch -> target pgid -> source pgid -> pg
1771 std::map
<epoch_t
,std::map
<spg_t
,std::map
<spg_t
,PGRef
>>> merge_waiters
;
1773 bool add_merge_waiter(OSDMapRef nextmap
, spg_t target
, PGRef source
,
1776 // -- placement groups --
1777 std::atomic
<size_t> num_pgs
= {0};
1779 std::mutex pending_creates_lock
;
1780 using create_from_osd_t
= std::pair
<spg_t
, bool /* is primary*/>;
1781 std::set
<create_from_osd_t
> pending_creates_from_osd
;
1782 unsigned pending_creates_from_mon
= 0;
1784 PGRecoveryStats pg_recovery_stats
;
1786 PGRef
_lookup_pg(spg_t pgid
);
1787 PGRef
_lookup_lock_pg(spg_t pgid
);
1788 void register_pg(PGRef pg
);
1789 bool try_finish_pg_delete(PG
*pg
, unsigned old_pg_num
);
1791 void _get_pgs(std::vector
<PGRef
> *v
, bool clear_too
=false);
1792 void _get_pgids(std::vector
<spg_t
> *v
);
1795 PGRef
lookup_lock_pg(spg_t pgid
);
1797 std::set
<int64_t> get_mapped_pools();
1800 PG
* _make_pg(OSDMapRef createmap
, spg_t pgid
);
1802 bool maybe_wait_for_max_pg(const OSDMapRef
& osdmap
,
1803 spg_t pgid
, bool is_mon_create
);
1804 void resume_creating_pg();
1808 /// build initial pg history and intervals on create
1809 void build_initial_pg_history(
1812 utime_t created_stamp
,
1816 epoch_t last_pg_create_epoch
;
1818 void handle_pg_create(OpRequestRef op
);
1822 const std::set
<spg_t
> &childpgids
, std::set
<PGRef
> *out_pgs
,
1826 void _finish_splits(std::set
<PGRef
>& pgs
);
1828 // == monitor interaction ==
1829 ceph::mutex mon_report_lock
= ceph::make_mutex("OSD::mon_report_lock");
1830 utime_t last_mon_report
;
1831 Finisher boot_finisher
;
1835 void _got_mon_epochs(epoch_t oldest
, epoch_t newest
);
1836 void _preboot(epoch_t oldest
, epoch_t newest
);
1838 void _collect_metadata(std::map
<std::string
,std::string
> *pmeta
);
1839 void _get_purged_snaps();
1840 void handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply
*r
);
1842 void start_waiting_for_healthy();
1845 void send_full_update();
1847 friend struct CB_OSD_GetVersion
;
1850 epoch_t up_thru_wanted
;
1852 void queue_want_up_thru(epoch_t want
);
1855 // -- full map requests --
1856 epoch_t requested_full_first
, requested_full_last
;
1858 void request_full_map(epoch_t first
, epoch_t last
);
1859 void rerequest_full_maps() {
1860 epoch_t first
= requested_full_first
;
1861 epoch_t last
= requested_full_last
;
1862 requested_full_first
= 0;
1863 requested_full_last
= 0;
1864 request_full_map(first
, last
);
1866 void got_full_map(epoch_t e
);
1869 std::map
<int,utime_t
> failure_queue
;
1870 std::map
<int,std::pair
<utime_t
,entity_addrvec_t
> > failure_pending
;
1872 void requeue_failures();
1873 void send_failures();
1874 void send_still_alive(epoch_t epoch
, int osd
, const entity_addrvec_t
&addrs
);
1875 void cancel_pending_failures();
1877 ceph::coarse_mono_clock::time_point last_sent_beacon
;
1878 ceph::mutex min_last_epoch_clean_lock
= ceph::make_mutex("OSD::min_last_epoch_clean_lock");
1879 epoch_t min_last_epoch_clean
= 0;
1880 // which pgs were scanned for min_lec
1881 std::vector
<pg_t
> min_last_epoch_clean_pgs
;
1882 void send_beacon(const ceph::coarse_mono_clock::time_point
& now
);
1884 ceph_tid_t
get_tid() {
1885 return service
.get_tid();
1888 // -- generic pg peering --
1889 void dispatch_context(PeeringCtx
&ctx
, PG
*pg
, OSDMapRef curmap
,
1890 ThreadPool::TPHandle
*handle
= NULL
);
1892 bool require_mon_peer(const Message
*m
);
1893 bool require_mon_or_mgr_peer(const Message
*m
);
1894 bool require_osd_peer(const Message
*m
);
1896 * Verifies that we were alive in the given epoch, and that
1899 bool require_self_aliveness(const Message
*m
, epoch_t alive_since
);
1901 * Verifies that the OSD who sent the given op has the same
1902 * address as in the given std::map.
1903 * @pre op was sent by an OSD using the cluster messenger
1905 bool require_same_peer_instance(const Message
*m
, const OSDMapRef
& map
,
1906 bool is_fast_dispatch
);
1908 bool require_same_or_newer_map(OpRequestRef
& op
, epoch_t e
,
1909 bool is_fast_dispatch
);
1911 void handle_fast_pg_create(MOSDPGCreate2
*m
);
1912 void handle_pg_query_nopg(const MQuery
& q
);
1913 void handle_fast_pg_notify(MOSDPGNotify
*m
);
1914 void handle_pg_notify_nopg(const MNotifyRec
& q
);
1915 void handle_fast_pg_info(MOSDPGInfo
*m
);
1916 void handle_fast_pg_remove(MOSDPGRemove
*m
);
1920 PGRef
handle_pg_create_info(const OSDMapRef
& osdmap
, const PGCreateInfo
*info
);
1923 void handle_fast_force_recovery(MOSDForceRecovery
*m
);
1926 void handle_command(class MCommand
*m
);
1929 // -- pg recovery --
1930 void do_recovery(PG
*pg
, epoch_t epoch_queued
, uint64_t pushes_reserved
,
1931 ThreadPool::TPHandle
&handle
);
1936 void resched_all_scrubs();
1937 bool scrub_random_backoff();
1939 // -- status reporting --
1940 MPGStats
*collect_pg_stats();
1941 std::vector
<DaemonHealthMetric
> get_health_metrics();
1945 bool ms_can_fast_dispatch_any() const override
{ return true; }
1946 bool ms_can_fast_dispatch(const Message
*m
) const override
{
1947 switch (m
->get_type()) {
1949 case CEPH_MSG_OSD_OP
:
1950 case CEPH_MSG_OSD_BACKOFF
:
1951 case MSG_OSD_SCRUB2
:
1952 case MSG_OSD_FORCE_RECOVERY
:
1953 case MSG_MON_COMMAND
:
1954 case MSG_OSD_PG_CREATE2
:
1955 case MSG_OSD_PG_QUERY
:
1956 case MSG_OSD_PG_QUERY2
:
1957 case MSG_OSD_PG_INFO
:
1958 case MSG_OSD_PG_INFO2
:
1959 case MSG_OSD_PG_NOTIFY
:
1960 case MSG_OSD_PG_NOTIFY2
:
1961 case MSG_OSD_PG_LOG
:
1962 case MSG_OSD_PG_TRIM
:
1963 case MSG_OSD_PG_REMOVE
:
1964 case MSG_OSD_BACKFILL_RESERVE
:
1965 case MSG_OSD_RECOVERY_RESERVE
:
1967 case MSG_OSD_REPOPREPLY
:
1968 case MSG_OSD_PG_PUSH
:
1969 case MSG_OSD_PG_PULL
:
1970 case MSG_OSD_PG_PUSH_REPLY
:
1971 case MSG_OSD_PG_SCAN
:
1972 case MSG_OSD_PG_BACKFILL
:
1973 case MSG_OSD_PG_BACKFILL_REMOVE
:
1974 case MSG_OSD_EC_WRITE
:
1975 case MSG_OSD_EC_WRITE_REPLY
:
1976 case MSG_OSD_EC_READ
:
1977 case MSG_OSD_EC_READ_REPLY
:
1978 case MSG_OSD_SCRUB_RESERVE
:
1979 case MSG_OSD_REP_SCRUB
:
1980 case MSG_OSD_REP_SCRUBMAP
:
1981 case MSG_OSD_PG_UPDATE_LOG_MISSING
:
1982 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY
:
1983 case MSG_OSD_PG_RECOVERY_DELETE
:
1984 case MSG_OSD_PG_RECOVERY_DELETE_REPLY
:
1985 case MSG_OSD_PG_LEASE
:
1986 case MSG_OSD_PG_LEASE_ACK
:
1992 void ms_fast_dispatch(Message
*m
) override
;
1993 bool ms_dispatch(Message
*m
) override
;
1994 void ms_handle_connect(Connection
*con
) override
;
1995 void ms_handle_fast_connect(Connection
*con
) override
;
1996 void ms_handle_fast_accept(Connection
*con
) override
;
1997 int ms_handle_authentication(Connection
*con
) override
;
1998 bool ms_handle_reset(Connection
*con
) override
;
1999 void ms_handle_remote_reset(Connection
*con
) override
{}
2000 bool ms_handle_refused(Connection
*con
) override
;
2003 /* internal and external can point to the same messenger, they will still
2004 * be cleaned up properly*/
2005 OSD(CephContext
*cct_
,
2006 std::unique_ptr
<ObjectStore
> store_
,
2008 Messenger
*internal
,
2009 Messenger
*external
,
2010 Messenger
*hb_front_client
,
2011 Messenger
*hb_back_client
,
2012 Messenger
*hb_front_server
,
2013 Messenger
*hb_back_server
,
2014 Messenger
*osdc_messenger
,
2015 MonClient
*mc
, const std::string
&dev
, const std::string
&jdev
,
2016 ceph::async::io_context_pool
& poolctx
);
2020 static int mkfs(CephContext
*cct
,
2021 std::unique_ptr
<ObjectStore
> store
,
2024 std::string osdspec_affinity
);
2026 /* remove any non-user xattrs from a std::map of them */
2027 void filter_xattrs(std::map
<std::string
, ceph::buffer::ptr
>& attrs
) {
2028 for (std::map
<std::string
, ceph::buffer::ptr
>::iterator iter
= attrs
.begin();
2029 iter
!= attrs
.end();
2031 if (('_' != iter
->first
.at(0)) || (iter
->first
.size() == 1))
2032 attrs
.erase(iter
++);
2038 int mon_cmd_maybe_osd_create(std::string
&cmd
);
2039 int update_crush_device_class();
2040 int update_crush_location();
2042 static int write_meta(CephContext
*cct
,
2044 uuid_d
& cluster_fsid
, uuid_d
& osd_fsid
, int whoami
, std::string
& osdspec_affinity
);
2046 void handle_scrub(class MOSDScrub
*m
);
2047 void handle_fast_scrub(class MOSDScrub2
*m
);
2048 void handle_osd_ping(class MOSDPing
*m
);
2050 size_t get_num_cache_shards();
2051 int get_num_op_shards();
2052 int get_num_op_threads();
2054 float get_osd_recovery_sleep();
2055 float get_osd_delete_sleep();
2056 float get_osd_snap_trim_sleep();
2058 int get_recovery_max_active();
2059 void maybe_override_max_osd_capacity_for_qos();
2060 bool maybe_override_options_for_qos();
2061 int run_osd_bench_test(int64_t count
,
2067 int mon_cmd_set_config(const std::string
&key
, const std::string
&val
);
2068 bool unsupported_objstore_for_qos();
2070 void scrub_purged_snaps();
2071 void probe_smart(const std::string
& devid
, std::ostream
& ss
);
2074 static int peek_meta(ObjectStore
*store
,
2076 uuid_d
*cluster_fsid
,
2079 ceph_release_t
*min_osd_release
);
2087 int enable_disable_fuse(bool stop
);
2088 int set_numa_affinity();
2090 void suicide(int exitcode
);
2093 void handle_signal(int signum
);
2095 /// check if we can throw out op from a disconnected client
2096 static bool op_is_discardable(const MOSDOp
*m
);
2100 friend class OSDService
;
2103 void set_perf_queries(const ConfigPayload
&config_payload
);
2104 MetricPayload
get_perf_reports();
2106 ceph::mutex m_perf_queries_lock
= ceph::make_mutex("OSD::m_perf_queries_lock");
2107 std::list
<OSDPerfMetricQuery
> m_perf_queries
;
2108 std::map
<OSDPerfMetricQuery
, OSDPerfMetricLimits
> m_perf_limits
;
2112 //compatibility of the executable
2113 extern const CompatSet::Feature ceph_osd_feature_compat
[];
2114 extern const CompatSet::Feature ceph_osd_feature_ro_compat
[];
2115 extern const CompatSet::Feature ceph_osd_feature_incompat
[];
2117 #endif // CEPH_OSD_H