]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/OSD.h
import ceph 16.2.7
[ceph.git] / ceph / src / osd / OSD.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_H
16 #define CEPH_OSD_H
17
18 #include "PG.h"
19
20 #include "msg/Dispatcher.h"
21
22 #include "common/async/context_pool.h"
23 #include "common/Timer.h"
24 #include "common/WorkQueue.h"
25 #include "common/AsyncReserver.h"
26 #include "common/ceph_context.h"
27 #include "common/config_cacher.h"
28 #include "common/zipkin_trace.h"
29 #include "common/ceph_timer.h"
30
31 #include "mgr/MgrClient.h"
32
33 #include "os/ObjectStore.h"
34
35 #include "include/CompatSet.h"
36 #include "include/common_fwd.h"
37
38 #include "OpRequest.h"
39 #include "Session.h"
40
41 #include "osd/scheduler/OpScheduler.h"
42
43 #include <atomic>
44 #include <map>
45 #include <memory>
46 #include <string>
47
48 #include "include/unordered_map.h"
49
50 #include "common/shared_cache.hpp"
51 #include "common/simple_cache.hpp"
52 #include "messages/MOSDOp.h"
53 #include "common/EventTrace.h"
54 #include "osd/osd_perf_counters.h"
55 #include "common/Finisher.h"
56
57 #define CEPH_OSD_PROTOCOL 10 /* cluster internal */
58
59 /*
60
61 lock ordering for pg map
62
63 PG::lock
64 ShardData::lock
65 OSD::pg_map_lock
66
67 */
68
69 class Messenger;
70 class Message;
71 class MonClient;
72 class ObjectStore;
73 class FuseStore;
74 class OSDMap;
75 class MLog;
76 class Objecter;
77 class KeyStore;
78
79 class Watch;
80 class PrimaryLogPG;
81
82 class TestOpsSocketHook;
83 struct C_FinishSplits;
84 struct C_OpenPGs;
85 class LogChannel;
86
87 class MOSDPGCreate2;
88 class MOSDPGQuery;
89 class MOSDPGNotify;
90 class MOSDPGInfo;
91 class MOSDPGRemove;
92 class MOSDForceRecovery;
93 class MMonGetPurgedSnapsReply;
94
95 class OSD;
96
97 class OSDService {
98 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
99 public:
100 OSD *osd;
101 CephContext *cct;
102 ObjectStore::CollectionHandle meta_ch;
103 const int whoami;
104 ObjectStore *&store;
105 LogClient &log_client;
106 LogChannelRef clog;
107 PGRecoveryStats &pg_recovery_stats;
108 private:
109 Messenger *&cluster_messenger;
110 Messenger *&client_messenger;
111 public:
112 PerfCounters *&logger;
113 PerfCounters *&recoverystate_perf;
114 MonClient *&monc;
115
116 md_config_cacher_t<Option::size_t> osd_max_object_size;
117 md_config_cacher_t<bool> osd_skip_data_digest;
118
119 void enqueue_back(OpSchedulerItem&& qi);
120 void enqueue_front(OpSchedulerItem&& qi);
121
122 void maybe_inject_dispatch_delay() {
123 if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) {
124 if (rand() % 10000 <
125 g_conf()->osd_debug_inject_dispatch_delay_probability * 10000) {
126 utime_t t;
127 t.set_from_double(g_conf()->osd_debug_inject_dispatch_delay_duration);
128 t.sleep();
129 }
130 }
131 }
132
133 ceph::signedspan get_mnow();
134
135 private:
136 // -- superblock --
137 ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
138 OSDSuperblock superblock;
139
140 public:
141 OSDSuperblock get_superblock() {
142 std::lock_guard l(publish_lock);
143 return superblock;
144 }
145 void publish_superblock(const OSDSuperblock &block) {
146 std::lock_guard l(publish_lock);
147 superblock = block;
148 }
149
150 int get_nodeid() const { return whoami; }
151
152 std::atomic<epoch_t> max_oldest_map;
153 private:
154 OSDMapRef osdmap;
155
156 public:
157 OSDMapRef get_osdmap() {
158 std::lock_guard l(publish_lock);
159 return osdmap;
160 }
161 epoch_t get_osdmap_epoch() {
162 std::lock_guard l(publish_lock);
163 return osdmap ? osdmap->get_epoch() : 0;
164 }
165 void publish_map(OSDMapRef map) {
166 std::lock_guard l(publish_lock);
167 osdmap = map;
168 }
169
170 /*
171 * osdmap - current published std::map
172 * next_osdmap - pre_published std::map that is about to be published.
173 *
174 * We use the next_osdmap to send messages and initiate connections,
175 * but only if the target is the same instance as the one in the std::map
176 * epoch the current user is working from (i.e., the result is
177 * equivalent to what is in next_osdmap).
178 *
179 * This allows the helpers to start ignoring osds that are about to
180 * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
181 * down, without worrying about reopening connections from threads
182 * working from old maps.
183 */
184 private:
185 OSDMapRef next_osdmap;
186 ceph::condition_variable pre_publish_cond;
187 int pre_publish_waiter = 0;
188
189 public:
190 void pre_publish_map(OSDMapRef map) {
191 std::lock_guard l(pre_publish_lock);
192 next_osdmap = std::move(map);
193 }
194
195 void activate_map();
196 /// map epochs reserved below
197 std::map<epoch_t, unsigned> map_reservations;
198
199 /// gets ref to next_osdmap and registers the epoch as reserved
200 OSDMapRef get_nextmap_reserved() {
201 std::lock_guard l(pre_publish_lock);
202 epoch_t e = next_osdmap->get_epoch();
203 std::map<epoch_t, unsigned>::iterator i =
204 map_reservations.insert(std::make_pair(e, 0)).first;
205 i->second++;
206 return next_osdmap;
207 }
208 /// releases reservation on map
209 void release_map(OSDMapRef osdmap) {
210 std::lock_guard l(pre_publish_lock);
211 std::map<epoch_t, unsigned>::iterator i =
212 map_reservations.find(osdmap->get_epoch());
213 ceph_assert(i != map_reservations.end());
214 ceph_assert(i->second > 0);
215 if (--(i->second) == 0) {
216 map_reservations.erase(i);
217 }
218 if (pre_publish_waiter) {
219 pre_publish_cond.notify_all();
220 }
221 }
222 /// blocks until there are no reserved maps prior to next_osdmap
223 void await_reserved_maps() {
224 std::unique_lock l{pre_publish_lock};
225 ceph_assert(next_osdmap);
226 pre_publish_waiter++;
227 pre_publish_cond.wait(l, [this] {
228 auto i = map_reservations.cbegin();
229 return (i == map_reservations.cend() ||
230 i->first >= next_osdmap->get_epoch());
231 });
232 pre_publish_waiter--;
233 }
234 OSDMapRef get_next_osdmap() {
235 std::lock_guard l(pre_publish_lock);
236 return next_osdmap;
237 }
238
239 void maybe_share_map(Connection *con,
240 const OSDMapRef& osdmap,
241 epoch_t peer_epoch_lb=0);
242
243 void send_map(class MOSDMap *m, Connection *con);
244 void send_incremental_map(epoch_t since, Connection *con,
245 const OSDMapRef& osdmap);
246 MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
247 OSDSuperblock& superblock);
248
249 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
250 std::pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
251 void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
252 void send_message_osd_cluster(std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch);
253 void send_message_osd_cluster(MessageRef m, Connection *con) {
254 con->send_message2(std::move(m));
255 }
256 void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
257 con->send_message(m);
258 }
259 void send_message_osd_client(Message *m, const ConnectionRef& con) {
260 con->send_message(m);
261 }
262 entity_name_t get_cluster_msgr_name() const;
263
264 private:
265 // -- scrub scheduling --
266 ceph::mutex sched_scrub_lock = ceph::make_mutex("OSDService::sched_scrub_lock");
267 int scrubs_local;
268 int scrubs_remote;
269
270 public:
271 struct ScrubJob {
272 CephContext* cct;
273 /// pg to be scrubbed
274 spg_t pgid;
275 /// a time scheduled for scrub. but the scrub could be delayed if system
276 /// load is too high or it fails to fall in the scrub hours
277 utime_t sched_time;
278 /// the hard upper bound of scrub time
279 utime_t deadline;
280 ScrubJob() : cct(nullptr) {}
281 explicit ScrubJob(CephContext* cct, const spg_t& pg,
282 const utime_t& timestamp,
283 double pool_scrub_min_interval = 0,
284 double pool_scrub_max_interval = 0, bool must = true);
285 /// order the jobs by sched_time
286 bool operator<(const ScrubJob& rhs) const;
287 };
288 std::set<ScrubJob> sched_scrub_pg;
289
290 /// @returns the scrub_reg_stamp used for unregistering the scrub job
291 utime_t reg_pg_scrub(spg_t pgid,
292 utime_t t,
293 double pool_scrub_min_interval,
294 double pool_scrub_max_interval,
295 bool must) {
296 ScrubJob scrub_job(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
297 must);
298 std::lock_guard l(OSDService::sched_scrub_lock);
299 sched_scrub_pg.insert(scrub_job);
300 return scrub_job.sched_time;
301 }
302
303 void unreg_pg_scrub(spg_t pgid, utime_t t) {
304 std::lock_guard l(sched_scrub_lock);
305 size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
306 ceph_assert(removed);
307 }
308
309 bool first_scrub_stamp(ScrubJob *out) {
310 std::lock_guard l(sched_scrub_lock);
311 if (sched_scrub_pg.empty())
312 return false;
313 std::set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
314 *out = *iter;
315 return true;
316 }
317 bool next_scrub_stamp(const ScrubJob& next,
318 ScrubJob *out) {
319 std::lock_guard l(sched_scrub_lock);
320 if (sched_scrub_pg.empty())
321 return false;
322 std::set<ScrubJob>::const_iterator iter = sched_scrub_pg.upper_bound(next);
323 if (iter == sched_scrub_pg.cend())
324 return false;
325 *out = *iter;
326 return true;
327 }
328
329 void dumps_scrub(ceph::Formatter* f);
330
331 bool can_inc_scrubs();
332 bool inc_scrubs_local();
333 void dec_scrubs_local();
334 bool inc_scrubs_remote();
335 void dec_scrubs_remote();
336 void dump_scrub_reservations(ceph::Formatter *f);
337
338 void reply_op_error(OpRequestRef op, int err);
339 void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv,
340 std::vector<pg_log_op_return_item_t> op_returns);
341 void handle_misdirected_op(PG *pg, OpRequestRef op);
342
343
344 private:
345 // -- agent shared state --
346 ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock");
347 ceph::condition_variable agent_cond;
348 std::map<uint64_t, std::set<PGRef> > agent_queue;
349 std::set<PGRef>::iterator agent_queue_pos;
350 bool agent_valid_iterator;
351 int agent_ops;
352 int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
353 std::set<hobject_t> agent_oids;
354 bool agent_active;
355 struct AgentThread : public Thread {
356 OSDService *osd;
357 explicit AgentThread(OSDService *o) : osd(o) {}
358 void *entry() override {
359 osd->agent_entry();
360 return NULL;
361 }
362 } agent_thread;
363 bool agent_stop_flag;
364 ceph::mutex agent_timer_lock = ceph::make_mutex("OSDService::agent_timer_lock");
365 SafeTimer agent_timer;
366
367 public:
368 void agent_entry();
369 void agent_stop();
370
371 void _enqueue(PG *pg, uint64_t priority) {
372 if (!agent_queue.empty() &&
373 agent_queue.rbegin()->first < priority)
374 agent_valid_iterator = false; // inserting higher-priority queue
375 std::set<PGRef>& nq = agent_queue[priority];
376 if (nq.empty())
377 agent_cond.notify_all();
378 nq.insert(pg);
379 }
380
381 void _dequeue(PG *pg, uint64_t old_priority) {
382 std::set<PGRef>& oq = agent_queue[old_priority];
383 std::set<PGRef>::iterator p = oq.find(pg);
384 ceph_assert(p != oq.end());
385 if (p == agent_queue_pos)
386 ++agent_queue_pos;
387 oq.erase(p);
388 if (oq.empty()) {
389 if (agent_queue.rbegin()->first == old_priority)
390 agent_valid_iterator = false;
391 agent_queue.erase(old_priority);
392 }
393 }
394
395 /// enable agent for a pg
396 void agent_enable_pg(PG *pg, uint64_t priority) {
397 std::lock_guard l(agent_lock);
398 _enqueue(pg, priority);
399 }
400
401 /// adjust priority for an enagled pg
402 void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
403 std::lock_guard l(agent_lock);
404 ceph_assert(new_priority != old_priority);
405 _enqueue(pg, new_priority);
406 _dequeue(pg, old_priority);
407 }
408
409 /// disable agent for a pg
410 void agent_disable_pg(PG *pg, uint64_t old_priority) {
411 std::lock_guard l(agent_lock);
412 _dequeue(pg, old_priority);
413 }
414
415 /// note start of an async (evict) op
416 void agent_start_evict_op() {
417 std::lock_guard l(agent_lock);
418 ++agent_ops;
419 }
420
421 /// note finish or cancellation of an async (evict) op
422 void agent_finish_evict_op() {
423 std::lock_guard l(agent_lock);
424 ceph_assert(agent_ops > 0);
425 --agent_ops;
426 agent_cond.notify_all();
427 }
428
429 /// note start of an async (flush) op
430 void agent_start_op(const hobject_t& oid) {
431 std::lock_guard l(agent_lock);
432 ++agent_ops;
433 ceph_assert(agent_oids.count(oid) == 0);
434 agent_oids.insert(oid);
435 }
436
437 /// note finish or cancellation of an async (flush) op
438 void agent_finish_op(const hobject_t& oid) {
439 std::lock_guard l(agent_lock);
440 ceph_assert(agent_ops > 0);
441 --agent_ops;
442 ceph_assert(agent_oids.count(oid) == 1);
443 agent_oids.erase(oid);
444 agent_cond.notify_all();
445 }
446
447 /// check if we are operating on an object
448 bool agent_is_active_oid(const hobject_t& oid) {
449 std::lock_guard l(agent_lock);
450 return agent_oids.count(oid);
451 }
452
453 /// get count of active agent ops
454 int agent_get_num_ops() {
455 std::lock_guard l(agent_lock);
456 return agent_ops;
457 }
458
459 void agent_inc_high_count() {
460 std::lock_guard l(agent_lock);
461 flush_mode_high_count ++;
462 }
463
464 void agent_dec_high_count() {
465 std::lock_guard l(agent_lock);
466 flush_mode_high_count --;
467 }
468
469 private:
470 /// throttle promotion attempts
471 std::atomic<unsigned int> promote_probability_millis{1000}; ///< probability thousands. one word.
472 PromoteCounter promote_counter;
473 utime_t last_recalibrate;
474 unsigned long promote_max_objects, promote_max_bytes;
475
476 public:
477 bool promote_throttle() {
478 // NOTE: lockless! we rely on the probability being a single word.
479 promote_counter.attempt();
480 if ((unsigned)rand() % 1000 > promote_probability_millis)
481 return true; // yes throttle (no promote)
482 if (promote_max_objects &&
483 promote_counter.objects > promote_max_objects)
484 return true; // yes throttle
485 if (promote_max_bytes &&
486 promote_counter.bytes > promote_max_bytes)
487 return true; // yes throttle
488 return false; // no throttle (promote)
489 }
490 void promote_finish(uint64_t bytes) {
491 promote_counter.finish(bytes);
492 }
493 void promote_throttle_recalibrate();
494 unsigned get_num_shards() const {
495 return m_objecter_finishers;
496 }
497 Finisher* get_objecter_finisher(int shard) {
498 return objecter_finishers[shard].get();
499 }
500
501 // -- Objecter, for tiering reads/writes from/to other OSDs --
502 ceph::async::io_context_pool& poolctx;
503 std::unique_ptr<Objecter> objecter;
504 int m_objecter_finishers;
505 std::vector<std::unique_ptr<Finisher>> objecter_finishers;
506
507 // -- Watch --
508 ceph::mutex watch_lock = ceph::make_mutex("OSDService::watch_lock");
509 SafeTimer watch_timer;
510 uint64_t next_notif_id;
511 uint64_t get_next_id(epoch_t cur_epoch) {
512 std::lock_guard l(watch_lock);
513 return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
514 }
515
516 // -- Recovery/Backfill Request Scheduling --
517 ceph::mutex recovery_request_lock = ceph::make_mutex("OSDService::recovery_request_lock");
518 SafeTimer recovery_request_timer;
519
520 // For async recovery sleep
521 bool recovery_needs_sleep = true;
522 ceph::real_clock::time_point recovery_schedule_time;
523
524 // For recovery & scrub & snap
525 ceph::mutex sleep_lock = ceph::make_mutex("OSDService::sleep_lock");
526 SafeTimer sleep_timer;
527
528 // -- tids --
529 // for ops i issue
530 std::atomic<unsigned int> last_tid{0};
531 ceph_tid_t get_tid() {
532 return (ceph_tid_t)last_tid++;
533 }
534
535 // -- backfill_reservation --
536 Finisher reserver_finisher;
537 AsyncReserver<spg_t, Finisher> local_reserver;
538 AsyncReserver<spg_t, Finisher> remote_reserver;
539
540 // -- pg merge --
541 ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
542 std::map<pg_t,eversion_t> ready_to_merge_source; // pg -> version
543 std::map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target; // pg -> (version,les,lec)
544 std::set<pg_t> not_ready_to_merge_source;
545 std::map<pg_t,pg_t> not_ready_to_merge_target;
546 std::set<pg_t> sent_ready_to_merge_source;
547
548 void set_ready_to_merge_source(PG *pg,
549 eversion_t version);
550 void set_ready_to_merge_target(PG *pg,
551 eversion_t version,
552 epoch_t last_epoch_started,
553 epoch_t last_epoch_clean);
554 void set_not_ready_to_merge_source(pg_t source);
555 void set_not_ready_to_merge_target(pg_t target, pg_t source);
556 void clear_ready_to_merge(PG *pg);
557 void send_ready_to_merge();
558 void _send_ready_to_merge();
559 void clear_sent_ready_to_merge();
560 void prune_sent_ready_to_merge(const OSDMapRef& osdmap);
561
562 // -- pg_temp --
563 private:
564 ceph::mutex pg_temp_lock = ceph::make_mutex("OSDService::pg_temp_lock");
565 struct pg_temp_t {
566 std::vector<int> acting;
567 bool forced = false;
568 };
569 std::map<pg_t, pg_temp_t> pg_temp_wanted;
570 std::map<pg_t, pg_temp_t> pg_temp_pending;
571 void _sent_pg_temp();
572 friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
573 public:
574 void queue_want_pg_temp(pg_t pgid, const std::vector<int>& want,
575 bool forced = false);
576 void remove_want_pg_temp(pg_t pgid);
577 void requeue_pg_temp();
578 void send_pg_temp();
579
580 ceph::mutex pg_created_lock = ceph::make_mutex("OSDService::pg_created_lock");
581 std::set<pg_t> pg_created;
582 void send_pg_created(pg_t pgid);
583 void prune_pg_created();
584 void send_pg_created();
585
586 AsyncReserver<spg_t, Finisher> snap_reserver;
587 void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c);
588 void queue_for_snap_trim(PG *pg);
589 void queue_for_scrub(PG* pg, Scrub::scrub_prio_t with_priority);
590 void queue_scrub_after_repair(PG* pg, Scrub::scrub_prio_t with_priority);
591
592 /// queue the message (-> event) that all replicas reserved scrub resources for us
593 void queue_for_scrub_granted(PG* pg, Scrub::scrub_prio_t with_priority);
594
595 /// queue the message (-> event) that some replicas denied our scrub resources request
596 void queue_for_scrub_denied(PG* pg, Scrub::scrub_prio_t with_priority);
597
598 /// Signals either (a) the end of a sleep period, or (b) a recheck of the availability
599 /// of the primary map being created by the backend.
600 void queue_for_scrub_resched(PG* pg, Scrub::scrub_prio_t with_priority);
601
602 /// Signals a change in the number of in-flight recovery writes
603 void queue_scrub_pushes_update(PG* pg, Scrub::scrub_prio_t with_priority);
604
605 /// Signals that all pending updates were applied
606 void queue_scrub_applied_update(PG* pg, Scrub::scrub_prio_t with_priority);
607
608 /// The block-range that was locked and prevented the scrubbing - is freed
609 void queue_scrub_unblocking(PG* pg, Scrub::scrub_prio_t with_priority);
610
611 /// Signals that all write OPs are done
612 void queue_scrub_digest_update(PG* pg, Scrub::scrub_prio_t with_priority);
613
614 /// Signals that we (the Primary) got all waited-for scrub-maps from our replicas
615 void queue_scrub_got_repl_maps(PG* pg, Scrub::scrub_prio_t with_priority);
616
617 void queue_for_rep_scrub(PG* pg,
618 Scrub::scrub_prio_t with_high_priority,
619 unsigned int qu_priority);
620
621 /// Signals a change in the number of in-flight recovery writes
622 void queue_scrub_replica_pushes(PG *pg, Scrub::scrub_prio_t with_priority);
623
624 void queue_for_rep_scrub_resched(PG* pg,
625 Scrub::scrub_prio_t with_high_priority,
626 unsigned int qu_priority);
627
628 void queue_for_pg_delete(spg_t pgid, epoch_t e);
629 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
630
631 private:
632 // -- pg recovery and associated throttling --
633 ceph::mutex recovery_lock = ceph::make_mutex("OSDService::recovery_lock");
634 std::list<std::pair<epoch_t, PGRef> > awaiting_throttle;
635
636 /// queue a scrub-related message for a PG
637 template <class MSG_TYPE>
638 void queue_scrub_event_msg(PG* pg,
639 Scrub::scrub_prio_t with_priority,
640 unsigned int qu_priority);
641
642 /// An alternative version of queue_scrub_event_msg(), in which the queuing priority is
643 /// provided by the executing scrub (i.e. taken from PgScrubber::m_flags)
644 template <class MSG_TYPE>
645 void queue_scrub_event_msg(PG* pg, Scrub::scrub_prio_t with_priority);
646
647 utime_t defer_recovery_until;
648 uint64_t recovery_ops_active;
649 uint64_t recovery_ops_reserved;
650 bool recovery_paused;
651 #ifdef DEBUG_RECOVERY_OIDS
652 std::map<spg_t, std::set<hobject_t> > recovery_oids;
653 #endif
654 bool _recover_now(uint64_t *available_pushes);
655 void _maybe_queue_recovery();
656 void _queue_for_recovery(
657 std::pair<epoch_t, PGRef> p, uint64_t reserved_pushes);
658 public:
659 void start_recovery_op(PG *pg, const hobject_t& soid);
660 void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
661 bool is_recovery_active();
662 void release_reserved_pushes(uint64_t pushes);
663 void defer_recovery(float defer_for) {
664 defer_recovery_until = ceph_clock_now();
665 defer_recovery_until += defer_for;
666 }
667 void pause_recovery() {
668 std::lock_guard l(recovery_lock);
669 recovery_paused = true;
670 }
671 bool recovery_is_paused() {
672 std::lock_guard l(recovery_lock);
673 return recovery_paused;
674 }
675 void unpause_recovery() {
676 std::lock_guard l(recovery_lock);
677 recovery_paused = false;
678 _maybe_queue_recovery();
679 }
680 void kick_recovery_queue() {
681 std::lock_guard l(recovery_lock);
682 _maybe_queue_recovery();
683 }
684 void clear_queued_recovery(PG *pg) {
685 std::lock_guard l(recovery_lock);
686 awaiting_throttle.remove_if(
687 [pg](decltype(awaiting_throttle)::const_reference awaiting ) {
688 return awaiting.second.get() == pg;
689 });
690 }
691
692 unsigned get_target_pg_log_entries() const;
693
694 // delayed pg activation
695 void queue_for_recovery(PG *pg) {
696 std::lock_guard l(recovery_lock);
697
698 if (pg->is_forced_recovery_or_backfill()) {
699 awaiting_throttle.push_front(std::make_pair(pg->get_osdmap()->get_epoch(), pg));
700 } else {
701 awaiting_throttle.push_back(std::make_pair(pg->get_osdmap()->get_epoch(), pg));
702 }
703 _maybe_queue_recovery();
704 }
705 void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
706 std::lock_guard l(recovery_lock);
707 _queue_for_recovery(std::make_pair(queued, pg), reserved_pushes);
708 }
709
710 void queue_check_readable(spg_t spgid,
711 epoch_t lpr,
712 ceph::signedspan delay = ceph::signedspan::zero());
713
714 // osd map cache (past osd maps)
715 ceph::mutex map_cache_lock = ceph::make_mutex("OSDService::map_cache_lock");
716 SharedLRU<epoch_t, const OSDMap> map_cache;
717 SimpleLRU<epoch_t, ceph::buffer::list> map_bl_cache;
718 SimpleLRU<epoch_t, ceph::buffer::list> map_bl_inc_cache;
719
720 OSDMapRef try_get_map(epoch_t e);
721 OSDMapRef get_map(epoch_t e) {
722 OSDMapRef ret(try_get_map(e));
723 ceph_assert(ret);
724 return ret;
725 }
726 OSDMapRef add_map(OSDMap *o) {
727 std::lock_guard l(map_cache_lock);
728 return _add_map(o);
729 }
730 OSDMapRef _add_map(OSDMap *o);
731
732 void _add_map_bl(epoch_t e, ceph::buffer::list& bl);
733 bool get_map_bl(epoch_t e, ceph::buffer::list& bl) {
734 std::lock_guard l(map_cache_lock);
735 return _get_map_bl(e, bl);
736 }
737 bool _get_map_bl(epoch_t e, ceph::buffer::list& bl);
738
739 void _add_map_inc_bl(epoch_t e, ceph::buffer::list& bl);
740 bool get_inc_map_bl(epoch_t e, ceph::buffer::list& bl);
741
742 /// identify split child pgids over a osdmap interval
743 void identify_splits_and_merges(
744 OSDMapRef old_map,
745 OSDMapRef new_map,
746 spg_t pgid,
747 std::set<std::pair<spg_t,epoch_t>> *new_children,
748 std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
749
750 void need_heartbeat_peer_update();
751
752 void init();
753 void final_init();
754 void start_shutdown();
755 void shutdown_reserver();
756 void shutdown();
757
758 // -- stats --
759 ceph::mutex stat_lock = ceph::make_mutex("OSDService::stat_lock");
760 osd_stat_t osd_stat;
761 uint32_t seq = 0;
762
763 void set_statfs(const struct store_statfs_t &stbuf,
764 osd_alert_list_t& alerts);
765 osd_stat_t set_osd_stat(std::vector<int>& hb_peers, int num_pgs);
766 void inc_osd_stat_repaired(void);
767 float compute_adjusted_ratio(osd_stat_t new_stat, float *pratio, uint64_t adjust_used = 0);
768 osd_stat_t get_osd_stat() {
769 std::lock_guard l(stat_lock);
770 ++seq;
771 osd_stat.up_from = up_epoch;
772 osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
773 return osd_stat;
774 }
775 uint64_t get_osd_stat_seq() {
776 std::lock_guard l(stat_lock);
777 return osd_stat.seq;
778 }
779 void get_hb_pingtime(std::map<int, osd_stat_t::Interfaces> *pp)
780 {
781 std::lock_guard l(stat_lock);
782 *pp = osd_stat.hb_pingtime;
783 return;
784 }
785
786 // -- OSD Full Status --
787 private:
788 friend TestOpsSocketHook;
789 mutable ceph::mutex full_status_lock = ceph::make_mutex("OSDService::full_status_lock");
790 enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
791 const char *get_full_state_name(s_names s) const {
792 switch (s) {
793 case NONE: return "none";
794 case NEARFULL: return "nearfull";
795 case BACKFILLFULL: return "backfillfull";
796 case FULL: return "full";
797 case FAILSAFE: return "failsafe";
798 default: return "???";
799 }
800 }
801 s_names get_full_state(std::string type) const {
802 if (type == "none")
803 return NONE;
804 else if (type == "failsafe")
805 return FAILSAFE;
806 else if (type == "full")
807 return FULL;
808 else if (type == "backfillfull")
809 return BACKFILLFULL;
810 else if (type == "nearfull")
811 return NEARFULL;
812 else
813 return INVALID;
814 }
815 double cur_ratio, physical_ratio; ///< current utilization
816 mutable int64_t injectfull = 0;
817 s_names injectfull_state = NONE;
818 float get_failsafe_full_ratio();
819 bool _check_inject_full(DoutPrefixProvider *dpp, s_names type) const;
820 bool _check_full(DoutPrefixProvider *dpp, s_names type) const;
821 public:
822 void check_full_status(float ratio, float pratio);
823 s_names recalc_full_state(float ratio, float pratio, std::string &inject);
824 bool _tentative_full(DoutPrefixProvider *dpp, s_names type, uint64_t adjust_used, osd_stat_t);
825 bool check_failsafe_full(DoutPrefixProvider *dpp) const;
826 bool check_full(DoutPrefixProvider *dpp) const;
827 bool tentative_backfill_full(DoutPrefixProvider *dpp, uint64_t adjust_used, osd_stat_t);
828 bool check_backfill_full(DoutPrefixProvider *dpp) const;
829 bool check_nearfull(DoutPrefixProvider *dpp) const;
830 bool is_failsafe_full() const;
831 bool is_full() const;
832 bool is_backfillfull() const;
833 bool is_nearfull() const;
834 bool need_fullness_update(); ///< osdmap state needs update
835 void set_injectfull(s_names type, int64_t count);
836
837
838 // -- epochs --
839 private:
840 // protects access to boot_epoch, up_epoch, bind_epoch
841 mutable ceph::mutex epoch_lock = ceph::make_mutex("OSDService::epoch_lock");
842 epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
843 epoch_t up_epoch; // _most_recent_ epoch we were marked up
844 epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
845 public:
846 /**
847 * Retrieve the boot_, up_, and bind_ epochs the OSD has std::set. The params
848 * can be NULL if you don't care about them.
849 */
850 void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
851 epoch_t *_bind_epoch) const;
852 /**
853 * Std::set the boot, up, and bind epochs. Any NULL params will not be std::set.
854 */
855 void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
856 const epoch_t *_bind_epoch);
857 epoch_t get_boot_epoch() const {
858 epoch_t ret;
859 retrieve_epochs(&ret, NULL, NULL);
860 return ret;
861 }
862 epoch_t get_up_epoch() const {
863 epoch_t ret;
864 retrieve_epochs(NULL, &ret, NULL);
865 return ret;
866 }
867 epoch_t get_bind_epoch() const {
868 epoch_t ret;
869 retrieve_epochs(NULL, NULL, &ret);
870 return ret;
871 }
872
873 void request_osdmap_update(epoch_t e);
874
875 // -- heartbeats --
876 ceph::mutex hb_stamp_lock = ceph::make_mutex("OSDServce::hb_stamp_lock");
877
878 /// osd -> heartbeat stamps
879 std::vector<HeartbeatStampsRef> hb_stamps;
880
881 /// get or create a ref for a peer's HeartbeatStamps
882 HeartbeatStampsRef get_hb_stamps(unsigned osd);
883
884
885 // Timer for readable leases
886 ceph::timer<ceph::mono_clock> mono_timer = ceph::timer<ceph::mono_clock>{ceph::construct_suspended};
887
888 void queue_renew_lease(epoch_t epoch, spg_t spgid);
889
890 // -- stopping --
891 ceph::mutex is_stopping_lock = ceph::make_mutex("OSDService::is_stopping_lock");
892 ceph::condition_variable is_stopping_cond;
893 enum {
894 NOT_STOPPING,
895 PREPARING_TO_STOP,
896 STOPPING };
897 std::atomic<int> state{NOT_STOPPING};
898 int get_state() const {
899 return state;
900 }
901 void set_state(int s) {
902 state = s;
903 }
904 bool is_stopping() const {
905 return state == STOPPING;
906 }
907 bool is_preparing_to_stop() const {
908 return state == PREPARING_TO_STOP;
909 }
910 bool prepare_to_stop();
911 void got_stop_ack();
912
913
914 #ifdef PG_DEBUG_REFS
915 ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock");
916 std::map<spg_t, int> pgid_tracker;
917 std::map<spg_t, PG*> live_pgs;
918 void add_pgid(spg_t pgid, PG *pg);
919 void remove_pgid(spg_t pgid, PG *pg);
920 void dump_live_pgids();
921 #endif
922
923 explicit OSDService(OSD *osd, ceph::async::io_context_pool& poolctx);
924 ~OSDService() = default;
925 };
926
927 /*
928
929 Each PG slot includes queues for events that are processing and/or waiting
930 for a PG to be materialized in the slot.
931
932 These are the constraints:
933
934 - client ops must remained ordered by client, regardless of std::map epoch
935 - peering messages/events from peers must remain ordered by peer
936 - peering messages and client ops need not be ordered relative to each other
937
938 - some peering events can create a pg (e.g., notify)
939 - the query peering event can proceed when a PG doesn't exist
940
941 Implementation notes:
942
943 - everybody waits for split. If the OSD has the parent PG it will instantiate
944 the PGSlot early and mark it waiting_for_split. Everything will wait until
945 the parent is able to commit the split operation and the child PG's are
946 materialized in the child slots.
947
948 - every event has an epoch property and will wait for the OSDShard to catch
949 up to that epoch. For example, if we get a peering event from a future
950 epoch, the event will wait in the slot until the local OSD has caught up.
951 (We should be judicious in specifying the required epoch [by, e.g., setting
952 it to the same_interval_since epoch] so that we don't wait for epochs that
953 don't affect the given PG.)
954
955 - we maintain two separate wait lists, *waiting* and *waiting_peering*. The
956 OpSchedulerItem has an is_peering() bool to determine which we use. Waiting
957 peering events are queued up by epoch required.
958
959 - when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or
960 materialized the PG), we wake *all* waiting items. (This could be optimized,
961 probably, but we don't bother.) We always requeue peering items ahead of
962 client ops.
963
964 - some peering events are marked !peering_requires_pg (PGQuery). if we do
965 not have a PG these are processed immediately (under the shard lock).
966
967 - we do not have a PG present, we check if the slot maps to the current host.
968 if so, we either queue the item and wait for the PG to materialize, or
969 (if the event is a pg creating event like PGNotify), we materialize the PG.
970
971 - when we advance the osdmap on the OSDShard, we scan pg slots and
972 discard any slots with no pg (and not waiting_for_split) that no
973 longer std::map to the current host.
974
975 */
976
977 struct OSDShardPGSlot {
978 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
979 PGRef pg; ///< pg reference
980 std::deque<OpSchedulerItem> to_process; ///< order items for this slot
981 int num_running = 0; ///< _process threads doing pg lookup/lock
982
983 std::deque<OpSchedulerItem> waiting; ///< waiting for pg (or map + pg)
984
985 /// waiting for map (peering evt)
986 std::map<epoch_t,std::deque<OpSchedulerItem>> waiting_peering;
987
988 /// incremented by wake_pg_waiters; indicates racing _process threads
989 /// should bail out (their op has been requeued)
990 uint64_t requeue_seq = 0;
991
992 /// waiting for split child to materialize in these epoch(s)
993 std::set<epoch_t> waiting_for_split;
994
995 epoch_t epoch = 0;
996 boost::intrusive::set_member_hook<> pg_epoch_item;
997
998 /// waiting for a merge (source or target) by this epoch
999 epoch_t waiting_for_merge_epoch = 0;
1000 };
1001
1002 struct OSDShard {
1003 const unsigned shard_id;
1004 CephContext *cct;
1005 OSD *osd;
1006
1007 std::string shard_name;
1008
1009 std::string sdata_wait_lock_name;
1010 ceph::mutex sdata_wait_lock;
1011 ceph::condition_variable sdata_cond;
1012 int waiting_threads = 0;
1013
1014 ceph::mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock
1015 OSDMapRef shard_osdmap;
1016
1017 OSDMapRef get_osdmap() {
1018 std::lock_guard l(osdmap_lock);
1019 return shard_osdmap;
1020 }
1021
1022 std::string shard_lock_name;
1023 ceph::mutex shard_lock; ///< protects remaining members below
1024
1025 /// map of slots for each spg_t. maintains ordering of items dequeued
1026 /// from scheduler while _process thread drops shard lock to acquire the
1027 /// pg lock. stale slots are removed by consume_map.
1028 std::unordered_map<spg_t,std::unique_ptr<OSDShardPGSlot>> pg_slots;
1029
1030 struct pg_slot_compare_by_epoch {
1031 bool operator()(const OSDShardPGSlot& l, const OSDShardPGSlot& r) const {
1032 return l.epoch < r.epoch;
1033 }
1034 };
1035
1036 /// maintain an ordering of pg slots by pg epoch
1037 boost::intrusive::multiset<
1038 OSDShardPGSlot,
1039 boost::intrusive::member_hook<
1040 OSDShardPGSlot,
1041 boost::intrusive::set_member_hook<>,
1042 &OSDShardPGSlot::pg_epoch_item>,
1043 boost::intrusive::compare<pg_slot_compare_by_epoch>> pg_slots_by_epoch;
1044 int waiting_for_min_pg_epoch = 0;
1045 ceph::condition_variable min_pg_epoch_cond;
1046
1047 /// priority queue
1048 ceph::osd::scheduler::OpSchedulerRef scheduler;
1049
1050 bool stop_waiting = false;
1051
1052 ContextQueue context_queue;
1053
1054 void _attach_pg(OSDShardPGSlot *slot, PG *pg);
1055 void _detach_pg(OSDShardPGSlot *slot);
1056
1057 void update_pg_epoch(OSDShardPGSlot *slot, epoch_t epoch);
1058 epoch_t get_min_pg_epoch();
1059 void wait_min_pg_epoch(epoch_t need);
1060
1061 /// return newest epoch we are waiting for
1062 epoch_t get_max_waiting_epoch();
1063
1064 /// push osdmap into shard
1065 void consume_map(
1066 const OSDMapRef& osdmap,
1067 unsigned *pushes_to_free);
1068
1069 void _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
1070
1071 void identify_splits_and_merges(
1072 const OSDMapRef& as_of_osdmap,
1073 std::set<std::pair<spg_t,epoch_t>> *split_children,
1074 std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
1075 void _prime_splits(std::set<std::pair<spg_t,epoch_t>> *pgids);
1076 void prime_splits(const OSDMapRef& as_of_osdmap,
1077 std::set<std::pair<spg_t,epoch_t>> *pgids);
1078 void prime_merges(const OSDMapRef& as_of_osdmap,
1079 std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
1080 void register_and_wake_split_child(PG *pg);
1081 void unprime_split_children(spg_t parent, unsigned old_pg_num);
1082 void update_scheduler_config();
1083
1084 OSDShard(
1085 int id,
1086 CephContext *cct,
1087 OSD *osd);
1088 };
1089
1090 class OSD : public Dispatcher,
1091 public md_config_obs_t {
1092 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
1093
1094 /** OSD **/
1095 // global lock
1096 ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock");
1097 SafeTimer tick_timer; // safe timer (osd_lock)
1098
1099 // Tick timer for those stuff that do not need osd_lock
1100 ceph::mutex tick_timer_lock = ceph::make_mutex("OSD::tick_timer_lock");
1101 SafeTimer tick_timer_without_osd_lock;
1102 std::string gss_ktfile_client{};
1103
1104 public:
1105 // config observer bits
1106 const char** get_tracked_conf_keys() const override;
1107 void handle_conf_change(const ConfigProxy& conf,
1108 const std::set <std::string> &changed) override;
1109 void update_log_config();
1110 void check_config();
1111
1112 protected:
1113
1114 const double OSD_TICK_INTERVAL = { 1.0 };
1115 double get_tick_interval() const;
1116
1117 Messenger *cluster_messenger;
1118 Messenger *client_messenger;
1119 Messenger *objecter_messenger;
1120 MonClient *monc; // check the "monc helpers" list before accessing directly
1121 MgrClient mgrc;
1122 PerfCounters *logger;
1123 PerfCounters *recoverystate_perf;
1124 ObjectStore *store;
1125 #ifdef HAVE_LIBFUSE
1126 FuseStore *fuse_store = nullptr;
1127 #endif
1128 LogClient log_client;
1129 LogChannelRef clog;
1130
1131 int whoami;
1132 std::string dev_path, journal_path;
1133
1134 ceph_release_t last_require_osd_release{ceph_release_t::unknown};
1135
1136 int numa_node = -1;
1137 size_t numa_cpu_set_size = 0;
1138 cpu_set_t numa_cpu_set;
1139
1140 bool store_is_rotational = true;
1141 bool journal_is_rotational = true;
1142
1143 ZTracer::Endpoint trace_endpoint;
1144 PerfCounters* create_logger();
1145 PerfCounters* create_recoverystate_perf();
1146 void tick();
1147 void tick_without_osd_lock();
1148 void _dispatch(Message *m);
1149 void dispatch_op(OpRequestRef op);
1150
1151 void check_osdmap_features();
1152
1153 // asok
1154 friend class OSDSocketHook;
1155 class OSDSocketHook *asok_hook;
1156 void asok_command(
1157 std::string_view prefix,
1158 const cmdmap_t& cmdmap,
1159 ceph::Formatter *f,
1160 const ceph::buffer::list& inbl,
1161 std::function<void(int,const std::string&,ceph::buffer::list&)> on_finish);
1162
1163 public:
1164 int get_nodeid() { return whoami; }
1165
1166 static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
1167 char foo[20];
1168 snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
1169 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1170 }
1171 static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
1172 char foo[22];
1173 snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch);
1174 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1175 }
1176
1177 static ghobject_t make_snapmapper_oid() {
1178 return ghobject_t(hobject_t(
1179 sobject_t(
1180 object_t("snapmapper"),
1181 0)));
1182 }
1183 static ghobject_t make_purged_snaps_oid() {
1184 return ghobject_t(hobject_t(
1185 sobject_t(
1186 object_t("purged_snaps"),
1187 0)));
1188 }
1189
1190 static ghobject_t make_pg_log_oid(spg_t pg) {
1191 std::stringstream ss;
1192 ss << "pglog_" << pg;
1193 std::string s;
1194 getline(ss, s);
1195 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1196 }
1197
1198 static ghobject_t make_pg_biginfo_oid(spg_t pg) {
1199 std::stringstream ss;
1200 ss << "pginfo_" << pg;
1201 std::string s;
1202 getline(ss, s);
1203 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1204 }
1205 static ghobject_t make_infos_oid() {
1206 hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
1207 return ghobject_t(oid);
1208 }
1209
1210 static ghobject_t make_final_pool_info_oid(int64_t pool) {
1211 return ghobject_t(
1212 hobject_t(
1213 sobject_t(
1214 object_t(std::string("final_pool_") + stringify(pool)),
1215 CEPH_NOSNAP)));
1216 }
1217
1218 static ghobject_t make_pg_num_history_oid() {
1219 return ghobject_t(hobject_t(sobject_t("pg_num_history", CEPH_NOSNAP)));
1220 }
1221
1222 static void recursive_remove_collection(CephContext* cct,
1223 ObjectStore *store,
1224 spg_t pgid,
1225 coll_t tmp);
1226
1227 /**
1228 * get_osd_initial_compat_set()
1229 *
1230 * Get the initial feature std::set for this OSD. Features
1231 * here are automatically upgraded.
1232 *
1233 * Return value: Initial osd CompatSet
1234 */
1235 static CompatSet get_osd_initial_compat_set();
1236
1237 /**
1238 * get_osd_compat_set()
1239 *
1240 * Get all features supported by this OSD
1241 *
1242 * Return value: CompatSet of all supported features
1243 */
1244 static CompatSet get_osd_compat_set();
1245
1246
1247 private:
1248 class C_Tick;
1249 class C_Tick_WithoutOSDLock;
1250
1251 // -- config settings --
1252 float m_osd_pg_epoch_max_lag_factor;
1253
1254 // -- superblock --
1255 OSDSuperblock superblock;
1256
1257 void write_superblock();
1258 void write_superblock(ObjectStore::Transaction& t);
1259 int read_superblock();
1260
1261 void clear_temp_objects();
1262
1263 CompatSet osd_compat;
1264
1265 // -- state --
1266 public:
1267 typedef enum {
1268 STATE_INITIALIZING = 1,
1269 STATE_PREBOOT,
1270 STATE_BOOTING,
1271 STATE_ACTIVE,
1272 STATE_STOPPING,
1273 STATE_WAITING_FOR_HEALTHY
1274 } osd_state_t;
1275
1276 static const char *get_state_name(int s) {
1277 switch (s) {
1278 case STATE_INITIALIZING: return "initializing";
1279 case STATE_PREBOOT: return "preboot";
1280 case STATE_BOOTING: return "booting";
1281 case STATE_ACTIVE: return "active";
1282 case STATE_STOPPING: return "stopping";
1283 case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy";
1284 default: return "???";
1285 }
1286 }
1287
1288 private:
1289 std::atomic<int> state{STATE_INITIALIZING};
1290
1291 public:
1292 int get_state() const {
1293 return state;
1294 }
1295 void set_state(int s) {
1296 state = s;
1297 }
1298 bool is_initializing() const {
1299 return state == STATE_INITIALIZING;
1300 }
1301 bool is_preboot() const {
1302 return state == STATE_PREBOOT;
1303 }
1304 bool is_booting() const {
1305 return state == STATE_BOOTING;
1306 }
1307 bool is_active() const {
1308 return state == STATE_ACTIVE;
1309 }
1310 bool is_stopping() const {
1311 return state == STATE_STOPPING;
1312 }
1313 bool is_waiting_for_healthy() const {
1314 return state == STATE_WAITING_FOR_HEALTHY;
1315 }
1316
1317 private:
1318
1319 ShardedThreadPool osd_op_tp;
1320
1321 void get_latest_osdmap();
1322
1323 // -- sessions --
1324 private:
1325 void dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap);
1326
1327 ceph::mutex session_waiting_lock = ceph::make_mutex("OSD::session_waiting_lock");
1328 std::set<ceph::ref_t<Session>> session_waiting_for_map;
1329
1330 /// Caller assumes refs for included Sessions
1331 void get_sessions_waiting_for_map(std::set<ceph::ref_t<Session>> *out) {
1332 std::lock_guard l(session_waiting_lock);
1333 out->swap(session_waiting_for_map);
1334 }
1335 void register_session_waiting_on_map(const ceph::ref_t<Session>& session) {
1336 std::lock_guard l(session_waiting_lock);
1337 session_waiting_for_map.insert(session);
1338 }
1339 void clear_session_waiting_on_map(const ceph::ref_t<Session>& session) {
1340 std::lock_guard l(session_waiting_lock);
1341 session_waiting_for_map.erase(session);
1342 }
1343 void dispatch_sessions_waiting_on_map() {
1344 std::set<ceph::ref_t<Session>> sessions_to_check;
1345 get_sessions_waiting_for_map(&sessions_to_check);
1346 for (auto i = sessions_to_check.begin();
1347 i != sessions_to_check.end();
1348 sessions_to_check.erase(i++)) {
1349 std::lock_guard l{(*i)->session_dispatch_lock};
1350 dispatch_session_waiting(*i, get_osdmap());
1351 }
1352 }
1353 void session_handle_reset(const ceph::ref_t<Session>& session) {
1354 std::lock_guard l(session->session_dispatch_lock);
1355 clear_session_waiting_on_map(session);
1356
1357 session->clear_backoffs();
1358
1359 /* Messages have connection refs, we need to clear the
1360 * connection->session->message->connection
1361 * cycles which result.
1362 * Bug #12338
1363 */
1364 session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
1365 }
1366
1367 private:
1368 /**
1369 * @defgroup monc helpers
1370 * @{
1371 * Right now we only have the one
1372 */
1373
1374 /**
1375 * Ask the Monitors for a sequence of OSDMaps.
1376 *
1377 * @param epoch The epoch to start with when replying
1378 * @param force_request True if this request forces a new subscription to
1379 * the monitors; false if an outstanding request that encompasses it is
1380 * sufficient.
1381 */
1382 void osdmap_subscribe(version_t epoch, bool force_request);
1383 /** @} monc helpers */
1384
1385 ceph::mutex osdmap_subscribe_lock = ceph::make_mutex("OSD::osdmap_subscribe_lock");
1386 epoch_t latest_subscribed_epoch{0};
1387
1388 // -- heartbeat --
1389 /// information about a heartbeat peer
1390 struct HeartbeatInfo {
1391 int peer; ///< peer
1392 ConnectionRef con_front; ///< peer connection (front)
1393 ConnectionRef con_back; ///< peer connection (back)
1394 utime_t first_tx; ///< time we sent our first ping request
1395 utime_t last_tx; ///< last time we sent a ping request
1396 utime_t last_rx_front; ///< last time we got a ping reply on the front side
1397 utime_t last_rx_back; ///< last time we got a ping reply on the back side
1398 epoch_t epoch; ///< most recent epoch we wanted this peer
1399 /// number of connections we send and receive heartbeat pings/replies
1400 static constexpr int HEARTBEAT_MAX_CONN = 2;
1401 /// history of inflight pings, arranging by timestamp we sent
1402 /// send time -> deadline -> remaining replies
1403 std::map<utime_t, std::pair<utime_t, int>> ping_history;
1404
1405 utime_t hb_interval_start;
1406 uint32_t hb_average_count = 0;
1407 uint32_t hb_index = 0;
1408
1409 uint32_t hb_total_back = 0;
1410 uint32_t hb_min_back = UINT_MAX;
1411 uint32_t hb_max_back = 0;
1412 std::vector<uint32_t> hb_back_pingtime;
1413 std::vector<uint32_t> hb_back_min;
1414 std::vector<uint32_t> hb_back_max;
1415
1416 uint32_t hb_total_front = 0;
1417 uint32_t hb_min_front = UINT_MAX;
1418 uint32_t hb_max_front = 0;
1419 std::vector<uint32_t> hb_front_pingtime;
1420 std::vector<uint32_t> hb_front_min;
1421 std::vector<uint32_t> hb_front_max;
1422
1423 bool is_stale(utime_t stale) const {
1424 if (ping_history.empty()) {
1425 return false;
1426 }
1427 utime_t oldest_deadline = ping_history.begin()->second.first;
1428 return oldest_deadline <= stale;
1429 }
1430
1431 bool is_unhealthy(utime_t now) const {
1432 if (ping_history.empty()) {
1433 /// we haven't sent a ping yet or we have got all replies,
1434 /// in either way we are safe and healthy for now
1435 return false;
1436 }
1437
1438 utime_t oldest_deadline = ping_history.begin()->second.first;
1439 return now > oldest_deadline;
1440 }
1441
1442 bool is_healthy(utime_t now) const {
1443 if (last_rx_front == utime_t() || last_rx_back == utime_t()) {
1444 // only declare to be healthy until we have received the first
1445 // replies from both front/back connections
1446 return false;
1447 }
1448 return !is_unhealthy(now);
1449 }
1450
1451 void clear_mark_down(Connection *except = nullptr) {
1452 if (con_back && con_back != except) {
1453 con_back->mark_down();
1454 con_back->clear_priv();
1455 con_back.reset(nullptr);
1456 }
1457 if (con_front && con_front != except) {
1458 con_front->mark_down();
1459 con_front->clear_priv();
1460 con_front.reset(nullptr);
1461 }
1462 }
1463 };
1464
1465 ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock");
1466 std::map<int, int> debug_heartbeat_drops_remaining;
1467 ceph::condition_variable heartbeat_cond;
1468 bool heartbeat_stop;
1469 std::atomic<bool> heartbeat_need_update;
1470 std::map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
1471 utime_t last_mon_heartbeat;
1472 Messenger *hb_front_client_messenger;
1473 Messenger *hb_back_client_messenger;
1474 Messenger *hb_front_server_messenger;
1475 Messenger *hb_back_server_messenger;
1476 utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
1477 double daily_loadavg;
1478 ceph::mono_time startup_time;
1479
1480 // Track ping repsonse times using vector as a circular buffer
1481 // MUST BE A POWER OF 2
1482 const uint32_t hb_vector_size = 16;
1483
1484 void _add_heartbeat_peer(int p);
1485 void _remove_heartbeat_peer(int p);
1486 bool heartbeat_reset(Connection *con);
1487 void maybe_update_heartbeat_peers();
1488 void reset_heartbeat_peers(bool all);
1489 bool heartbeat_peers_need_update() {
1490 return heartbeat_need_update.load();
1491 }
1492 void heartbeat_set_peers_need_update() {
1493 heartbeat_need_update.store(true);
1494 }
1495 void heartbeat_clear_peers_need_update() {
1496 heartbeat_need_update.store(false);
1497 }
1498 void heartbeat();
1499 void heartbeat_check();
1500 void heartbeat_entry();
1501 void need_heartbeat_peer_update();
1502
1503 void heartbeat_kick() {
1504 std::lock_guard l(heartbeat_lock);
1505 heartbeat_cond.notify_all();
1506 }
1507
1508 struct T_Heartbeat : public Thread {
1509 OSD *osd;
1510 explicit T_Heartbeat(OSD *o) : osd(o) {}
1511 void *entry() override {
1512 osd->heartbeat_entry();
1513 return 0;
1514 }
1515 } heartbeat_thread;
1516
1517 public:
1518 bool heartbeat_dispatch(Message *m);
1519
1520 struct HeartbeatDispatcher : public Dispatcher {
1521 OSD *osd;
1522 explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
1523
1524 bool ms_can_fast_dispatch_any() const override { return true; }
1525 bool ms_can_fast_dispatch(const Message *m) const override {
1526 switch (m->get_type()) {
1527 case CEPH_MSG_PING:
1528 case MSG_OSD_PING:
1529 return true;
1530 default:
1531 return false;
1532 }
1533 }
1534 void ms_fast_dispatch(Message *m) override {
1535 osd->heartbeat_dispatch(m);
1536 }
1537 bool ms_dispatch(Message *m) override {
1538 return osd->heartbeat_dispatch(m);
1539 }
1540 bool ms_handle_reset(Connection *con) override {
1541 return osd->heartbeat_reset(con);
1542 }
1543 void ms_handle_remote_reset(Connection *con) override {}
1544 bool ms_handle_refused(Connection *con) override {
1545 return osd->ms_handle_refused(con);
1546 }
1547 int ms_handle_authentication(Connection *con) override {
1548 return true;
1549 }
1550 } heartbeat_dispatcher;
1551
1552 private:
1553 // -- waiters --
1554 std::list<OpRequestRef> finished;
1555
1556 void take_waiters(std::list<OpRequestRef>& ls) {
1557 ceph_assert(ceph_mutex_is_locked(osd_lock));
1558 finished.splice(finished.end(), ls);
1559 }
1560 void do_waiters();
1561
1562 // -- op tracking --
1563 OpTracker op_tracker;
1564 void test_ops(std::string command, std::string args, std::ostream& ss);
1565 friend class TestOpsSocketHook;
1566 TestOpsSocketHook *test_ops_hook;
1567 friend struct C_FinishSplits;
1568 friend struct C_OpenPGs;
1569
1570 protected:
1571
1572 /*
1573 * The ordered op delivery chain is:
1574 *
1575 * fast dispatch -> scheduler back
1576 * scheduler front <-> to_process back
1577 * to_process front -> RunVis(item)
1578 * <- queue_front()
1579 *
1580 * The scheduler is per-shard, and to_process is per pg_slot. Items can be
1581 * pushed back up into to_process and/or scheduler while order is preserved.
1582 *
1583 * Multiple worker threads can operate on each shard.
1584 *
1585 * Under normal circumstances, num_running == to_process.size(). There are
1586 * two times when that is not true: (1) when waiting_for_pg == true and
1587 * to_process is accumulating requests that are waiting for the pg to be
1588 * instantiated; in that case they will all get requeued together by
1589 * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
1590 * and already requeued the items.
1591 */
1592 friend class ceph::osd::scheduler::PGOpItem;
1593 friend class ceph::osd::scheduler::PGPeeringItem;
1594 friend class ceph::osd::scheduler::PGRecovery;
1595 friend class ceph::osd::scheduler::PGRecoveryMsg;
1596 friend class ceph::osd::scheduler::PGDelete;
1597
1598 class ShardedOpWQ
1599 : public ShardedThreadPool::ShardedWQ<OpSchedulerItem>
1600 {
1601 OSD *osd;
1602
1603 public:
1604 ShardedOpWQ(OSD *o,
1605 ceph::timespan ti,
1606 ceph::timespan si,
1607 ShardedThreadPool* tp)
1608 : ShardedThreadPool::ShardedWQ<OpSchedulerItem>(ti, si, tp),
1609 osd(o) {
1610 }
1611
1612 void _add_slot_waiter(
1613 spg_t token,
1614 OSDShardPGSlot *slot,
1615 OpSchedulerItem&& qi);
1616
1617 /// try to do some work
1618 void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb) override;
1619
1620 /// enqueue a new item
1621 void _enqueue(OpSchedulerItem&& item) override;
1622
1623 /// requeue an old item (at the front of the line)
1624 void _enqueue_front(OpSchedulerItem&& item) override;
1625
1626 void return_waiting_threads() override {
1627 for(uint32_t i = 0; i < osd->num_shards; i++) {
1628 OSDShard* sdata = osd->shards[i];
1629 assert (NULL != sdata);
1630 std::scoped_lock l{sdata->sdata_wait_lock};
1631 sdata->stop_waiting = true;
1632 sdata->sdata_cond.notify_all();
1633 }
1634 }
1635
1636 void stop_return_waiting_threads() override {
1637 for(uint32_t i = 0; i < osd->num_shards; i++) {
1638 OSDShard* sdata = osd->shards[i];
1639 assert (NULL != sdata);
1640 std::scoped_lock l{sdata->sdata_wait_lock};
1641 sdata->stop_waiting = false;
1642 }
1643 }
1644
1645 void dump(ceph::Formatter *f) {
1646 for(uint32_t i = 0; i < osd->num_shards; i++) {
1647 auto &&sdata = osd->shards[i];
1648
1649 char queue_name[32] = {0};
1650 snprintf(queue_name, sizeof(queue_name), "%s%" PRIu32, "OSD:ShardedOpWQ:", i);
1651 ceph_assert(NULL != sdata);
1652
1653 std::scoped_lock l{sdata->shard_lock};
1654 f->open_object_section(queue_name);
1655 sdata->scheduler->dump(*f);
1656 f->close_section();
1657 }
1658 }
1659
1660 bool is_shard_empty(uint32_t thread_index) override {
1661 uint32_t shard_index = thread_index % osd->num_shards;
1662 auto &&sdata = osd->shards[shard_index];
1663 ceph_assert(sdata);
1664 std::lock_guard l(sdata->shard_lock);
1665 if (thread_index < osd->num_shards) {
1666 return sdata->scheduler->empty() && sdata->context_queue.empty();
1667 } else {
1668 return sdata->scheduler->empty();
1669 }
1670 }
1671
1672 void handle_oncommits(std::list<Context*>& oncommits) {
1673 for (auto p : oncommits) {
1674 p->complete(0);
1675 }
1676 }
1677 } op_shardedwq;
1678
1679
1680 void enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch);
1681 void dequeue_op(
1682 PGRef pg, OpRequestRef op,
1683 ThreadPool::TPHandle &handle);
1684
1685 void enqueue_peering_evt(
1686 spg_t pgid,
1687 PGPeeringEventRef ref);
1688 void dequeue_peering_evt(
1689 OSDShard *sdata,
1690 PG *pg,
1691 PGPeeringEventRef ref,
1692 ThreadPool::TPHandle& handle);
1693
1694 void dequeue_delete(
1695 OSDShard *sdata,
1696 PG *pg,
1697 epoch_t epoch,
1698 ThreadPool::TPHandle& handle);
1699
1700 friend class PG;
1701 friend struct OSDShard;
1702 friend class PrimaryLogPG;
1703 friend class PgScrubber;
1704
1705
1706 protected:
1707
1708 // -- osd map --
1709 // TODO: switch to std::atomic<OSDMapRef> when C++20 will be available.
1710 OSDMapRef _osdmap;
1711 void set_osdmap(OSDMapRef osdmap) {
1712 std::atomic_store(&_osdmap, osdmap);
1713 }
1714 OSDMapRef get_osdmap() const {
1715 return std::atomic_load(&_osdmap);
1716 }
1717 epoch_t get_osdmap_epoch() const {
1718 // XXX: performance?
1719 auto osdmap = get_osdmap();
1720 return osdmap ? osdmap->get_epoch() : 0;
1721 }
1722
1723 pool_pg_num_history_t pg_num_history;
1724
1725 ceph::shared_mutex map_lock = ceph::make_shared_mutex("OSD::map_lock");
1726 std::list<OpRequestRef> waiting_for_osdmap;
1727 std::deque<utime_t> osd_markdown_log;
1728
1729 friend struct send_map_on_destruct;
1730
1731 void wait_for_new_map(OpRequestRef op);
1732 void handle_osd_map(class MOSDMap *m);
1733 void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
1734 void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
1735 void note_down_osd(int osd);
1736 void note_up_osd(int osd);
1737 friend struct C_OnMapCommit;
1738
1739 bool advance_pg(
1740 epoch_t advance_to,
1741 PG *pg,
1742 ThreadPool::TPHandle &handle,
1743 PeeringCtx &rctx);
1744 void consume_map();
1745 void activate_map();
1746
1747 // osd map cache (past osd maps)
1748 OSDMapRef get_map(epoch_t e) {
1749 return service.get_map(e);
1750 }
1751 OSDMapRef add_map(OSDMap *o) {
1752 return service.add_map(o);
1753 }
1754 bool get_map_bl(epoch_t e, ceph::buffer::list& bl) {
1755 return service.get_map_bl(e, bl);
1756 }
1757
1758 public:
1759 // -- shards --
1760 std::vector<OSDShard*> shards;
1761 uint32_t num_shards = 0;
1762
1763 void inc_num_pgs() {
1764 ++num_pgs;
1765 }
1766 void dec_num_pgs() {
1767 --num_pgs;
1768 }
1769 int get_num_pgs() const {
1770 return num_pgs;
1771 }
1772
1773 protected:
1774 ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
1775 /// merge epoch -> target pgid -> source pgid -> pg
1776 std::map<epoch_t,std::map<spg_t,std::map<spg_t,PGRef>>> merge_waiters;
1777
1778 bool add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef source,
1779 unsigned need);
1780
1781 // -- placement groups --
1782 std::atomic<size_t> num_pgs = {0};
1783
1784 std::mutex pending_creates_lock;
1785 using create_from_osd_t = std::pair<spg_t, bool /* is primary*/>;
1786 std::set<create_from_osd_t> pending_creates_from_osd;
1787 unsigned pending_creates_from_mon = 0;
1788
1789 PGRecoveryStats pg_recovery_stats;
1790
1791 PGRef _lookup_pg(spg_t pgid);
1792 PGRef _lookup_lock_pg(spg_t pgid);
1793 void register_pg(PGRef pg);
1794 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
1795
1796 void _get_pgs(std::vector<PGRef> *v, bool clear_too=false);
1797 void _get_pgids(std::vector<spg_t> *v);
1798
1799 public:
1800 PGRef lookup_lock_pg(spg_t pgid);
1801
1802 std::set<int64_t> get_mapped_pools();
1803
1804 protected:
1805 PG* _make_pg(OSDMapRef createmap, spg_t pgid);
1806
1807 bool maybe_wait_for_max_pg(const OSDMapRef& osdmap,
1808 spg_t pgid, bool is_mon_create);
1809 void resume_creating_pg();
1810
1811 void load_pgs();
1812
1813 /// build initial pg history and intervals on create
1814 void build_initial_pg_history(
1815 spg_t pgid,
1816 epoch_t created,
1817 utime_t created_stamp,
1818 pg_history_t *h,
1819 PastIntervals *pi);
1820
1821 epoch_t last_pg_create_epoch;
1822
1823 void handle_pg_create(OpRequestRef op);
1824
1825 void split_pgs(
1826 PG *parent,
1827 const std::set<spg_t> &childpgids, std::set<PGRef> *out_pgs,
1828 OSDMapRef curmap,
1829 OSDMapRef nextmap,
1830 PeeringCtx &rctx);
1831 void _finish_splits(std::set<PGRef>& pgs);
1832
1833 // == monitor interaction ==
1834 ceph::mutex mon_report_lock = ceph::make_mutex("OSD::mon_report_lock");
1835 utime_t last_mon_report;
1836 Finisher boot_finisher;
1837
1838 // -- boot --
1839 void start_boot();
1840 void _got_mon_epochs(epoch_t oldest, epoch_t newest);
1841 void _preboot(epoch_t oldest, epoch_t newest);
1842 void _send_boot();
1843 void _collect_metadata(std::map<std::string,std::string> *pmeta);
1844 void _get_purged_snaps();
1845 void handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *r);
1846
1847 void start_waiting_for_healthy();
1848 bool _is_healthy();
1849
1850 void send_full_update();
1851
1852 friend struct CB_OSD_GetVersion;
1853
1854 // -- alive --
1855 epoch_t up_thru_wanted;
1856
1857 void queue_want_up_thru(epoch_t want);
1858 void send_alive();
1859
1860 // -- full map requests --
1861 epoch_t requested_full_first, requested_full_last;
1862
1863 void request_full_map(epoch_t first, epoch_t last);
1864 void rerequest_full_maps() {
1865 epoch_t first = requested_full_first;
1866 epoch_t last = requested_full_last;
1867 requested_full_first = 0;
1868 requested_full_last = 0;
1869 request_full_map(first, last);
1870 }
1871 void got_full_map(epoch_t e);
1872
1873 // -- failures --
1874 std::map<int,utime_t> failure_queue;
1875 std::map<int,std::pair<utime_t,entity_addrvec_t> > failure_pending;
1876
1877 void requeue_failures();
1878 void send_failures();
1879 void send_still_alive(epoch_t epoch, int osd, const entity_addrvec_t &addrs);
1880 void cancel_pending_failures();
1881
1882 ceph::coarse_mono_clock::time_point last_sent_beacon;
1883 ceph::mutex min_last_epoch_clean_lock = ceph::make_mutex("OSD::min_last_epoch_clean_lock");
1884 epoch_t min_last_epoch_clean = 0;
1885 // which pgs were scanned for min_lec
1886 std::vector<pg_t> min_last_epoch_clean_pgs;
1887 void send_beacon(const ceph::coarse_mono_clock::time_point& now);
1888
1889 ceph_tid_t get_tid() {
1890 return service.get_tid();
1891 }
1892
1893 double scrub_sleep_time(bool must_scrub);
1894
1895 // -- generic pg peering --
1896 PeeringCtx create_context();
1897 void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
1898 ThreadPool::TPHandle *handle = NULL);
1899
1900 bool require_mon_peer(const Message *m);
1901 bool require_mon_or_mgr_peer(const Message *m);
1902 bool require_osd_peer(const Message *m);
1903 /***
1904 * Verifies that we were alive in the given epoch, and that
1905 * still are.
1906 */
1907 bool require_self_aliveness(const Message *m, epoch_t alive_since);
1908 /**
1909 * Verifies that the OSD who sent the given op has the same
1910 * address as in the given std::map.
1911 * @pre op was sent by an OSD using the cluster messenger
1912 */
1913 bool require_same_peer_instance(const Message *m, const OSDMapRef& map,
1914 bool is_fast_dispatch);
1915
1916 bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
1917 bool is_fast_dispatch);
1918
1919 void handle_fast_pg_create(MOSDPGCreate2 *m);
1920 void handle_fast_pg_query(MOSDPGQuery *m);
1921 void handle_pg_query_nopg(const MQuery& q);
1922 void handle_fast_pg_notify(MOSDPGNotify *m);
1923 void handle_pg_notify_nopg(const MNotifyRec& q);
1924 void handle_fast_pg_info(MOSDPGInfo *m);
1925 void handle_fast_pg_remove(MOSDPGRemove *m);
1926
1927 public:
1928 // used by OSDShard
1929 PGRef handle_pg_create_info(const OSDMapRef& osdmap, const PGCreateInfo *info);
1930 protected:
1931
1932 void handle_fast_force_recovery(MOSDForceRecovery *m);
1933
1934 // -- commands --
1935 void handle_command(class MCommand *m);
1936
1937
1938 // -- pg recovery --
1939 void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
1940 ThreadPool::TPHandle &handle);
1941
1942
1943 // -- scrubbing --
1944 void sched_scrub();
1945 void resched_all_scrubs();
1946 bool scrub_random_backoff();
1947 bool scrub_load_below_threshold();
1948 bool scrub_time_permit(utime_t now);
1949
1950 // -- status reporting --
1951 MPGStats *collect_pg_stats();
1952 std::vector<DaemonHealthMetric> get_health_metrics();
1953
1954
1955 private:
1956 bool ms_can_fast_dispatch_any() const override { return true; }
1957 bool ms_can_fast_dispatch(const Message *m) const override {
1958 switch (m->get_type()) {
1959 case CEPH_MSG_PING:
1960 case CEPH_MSG_OSD_OP:
1961 case CEPH_MSG_OSD_BACKOFF:
1962 case MSG_OSD_SCRUB2:
1963 case MSG_OSD_FORCE_RECOVERY:
1964 case MSG_MON_COMMAND:
1965 case MSG_OSD_PG_CREATE2:
1966 case MSG_OSD_PG_QUERY:
1967 case MSG_OSD_PG_QUERY2:
1968 case MSG_OSD_PG_INFO:
1969 case MSG_OSD_PG_INFO2:
1970 case MSG_OSD_PG_NOTIFY:
1971 case MSG_OSD_PG_NOTIFY2:
1972 case MSG_OSD_PG_LOG:
1973 case MSG_OSD_PG_TRIM:
1974 case MSG_OSD_PG_REMOVE:
1975 case MSG_OSD_BACKFILL_RESERVE:
1976 case MSG_OSD_RECOVERY_RESERVE:
1977 case MSG_OSD_REPOP:
1978 case MSG_OSD_REPOPREPLY:
1979 case MSG_OSD_PG_PUSH:
1980 case MSG_OSD_PG_PULL:
1981 case MSG_OSD_PG_PUSH_REPLY:
1982 case MSG_OSD_PG_SCAN:
1983 case MSG_OSD_PG_BACKFILL:
1984 case MSG_OSD_PG_BACKFILL_REMOVE:
1985 case MSG_OSD_EC_WRITE:
1986 case MSG_OSD_EC_WRITE_REPLY:
1987 case MSG_OSD_EC_READ:
1988 case MSG_OSD_EC_READ_REPLY:
1989 case MSG_OSD_SCRUB_RESERVE:
1990 case MSG_OSD_REP_SCRUB:
1991 case MSG_OSD_REP_SCRUBMAP:
1992 case MSG_OSD_PG_UPDATE_LOG_MISSING:
1993 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
1994 case MSG_OSD_PG_RECOVERY_DELETE:
1995 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
1996 case MSG_OSD_PG_LEASE:
1997 case MSG_OSD_PG_LEASE_ACK:
1998 return true;
1999 default:
2000 return false;
2001 }
2002 }
2003 void ms_fast_dispatch(Message *m) override;
2004 bool ms_dispatch(Message *m) override;
2005 void ms_handle_connect(Connection *con) override;
2006 void ms_handle_fast_connect(Connection *con) override;
2007 void ms_handle_fast_accept(Connection *con) override;
2008 int ms_handle_authentication(Connection *con) override;
2009 bool ms_handle_reset(Connection *con) override;
2010 void ms_handle_remote_reset(Connection *con) override {}
2011 bool ms_handle_refused(Connection *con) override;
2012
2013 public:
2014 /* internal and external can point to the same messenger, they will still
2015 * be cleaned up properly*/
2016 OSD(CephContext *cct_,
2017 ObjectStore *store_,
2018 int id,
2019 Messenger *internal,
2020 Messenger *external,
2021 Messenger *hb_front_client,
2022 Messenger *hb_back_client,
2023 Messenger *hb_front_server,
2024 Messenger *hb_back_server,
2025 Messenger *osdc_messenger,
2026 MonClient *mc, const std::string &dev, const std::string &jdev,
2027 ceph::async::io_context_pool& poolctx);
2028 ~OSD() override;
2029
2030 // static bits
2031 static int mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, std::string osdspec_affinity);
2032
2033 /* remove any non-user xattrs from a std::map of them */
2034 void filter_xattrs(std::map<std::string, ceph::buffer::ptr>& attrs) {
2035 for (std::map<std::string, ceph::buffer::ptr>::iterator iter = attrs.begin();
2036 iter != attrs.end();
2037 ) {
2038 if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
2039 attrs.erase(iter++);
2040 else ++iter;
2041 }
2042 }
2043
2044 private:
2045 int mon_cmd_maybe_osd_create(std::string &cmd);
2046 int update_crush_device_class();
2047 int update_crush_location();
2048
2049 static int write_meta(CephContext *cct,
2050 ObjectStore *store,
2051 uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami, std::string& osdspec_affinity);
2052
2053 void handle_scrub(class MOSDScrub *m);
2054 void handle_fast_scrub(class MOSDScrub2 *m);
2055 void handle_osd_ping(class MOSDPing *m);
2056
2057 size_t get_num_cache_shards();
2058 int get_num_op_shards();
2059 int get_num_op_threads();
2060
2061 float get_osd_recovery_sleep();
2062 float get_osd_delete_sleep();
2063 float get_osd_snap_trim_sleep();
2064
2065 int get_recovery_max_active();
2066 void maybe_override_max_osd_capacity_for_qos();
2067 bool maybe_override_options_for_qos();
2068 int run_osd_bench_test(int64_t count,
2069 int64_t bsize,
2070 int64_t osize,
2071 int64_t onum,
2072 double *elapsed,
2073 std::ostream& ss);
2074 int mon_cmd_set_config(const std::string &key, const std::string &val);
2075
2076 void scrub_purged_snaps();
2077 void probe_smart(const std::string& devid, std::ostream& ss);
2078
2079 public:
2080 static int peek_meta(ObjectStore *store,
2081 std::string *magic,
2082 uuid_d *cluster_fsid,
2083 uuid_d *osd_fsid,
2084 int *whoami,
2085 ceph_release_t *min_osd_release);
2086
2087
2088 // startup/shutdown
2089 int pre_init();
2090 int init();
2091 void final_init();
2092
2093 int enable_disable_fuse(bool stop);
2094 int set_numa_affinity();
2095
2096 void suicide(int exitcode);
2097 int shutdown();
2098
2099 void handle_signal(int signum);
2100
2101 /// check if we can throw out op from a disconnected client
2102 static bool op_is_discardable(const MOSDOp *m);
2103
2104 public:
2105 OSDService service;
2106 friend class OSDService;
2107
2108 private:
2109 void set_perf_queries(const ConfigPayload &config_payload);
2110 MetricPayload get_perf_reports();
2111
2112 ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock");
2113 std::list<OSDPerfMetricQuery> m_perf_queries;
2114 std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> m_perf_limits;
2115 };
2116
2117
2118 //compatibility of the executable
2119 extern const CompatSet::Feature ceph_osd_feature_compat[];
2120 extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
2121 extern const CompatSet::Feature ceph_osd_feature_incompat[];
2122
2123 #endif // CEPH_OSD_H