]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/OSD.h
import 15.2.4
[ceph.git] / ceph / src / osd / OSD.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_H
16 #define CEPH_OSD_H
17
18 #include "PG.h"
19
20 #include "msg/Dispatcher.h"
21
22 #include "common/Timer.h"
23 #include "common/WorkQueue.h"
24 #include "common/AsyncReserver.h"
25 #include "common/ceph_context.h"
26 #include "common/config_cacher.h"
27 #include "common/zipkin_trace.h"
28 #include "common/ceph_timer.h"
29
30 #include "mgr/MgrClient.h"
31
32 #include "os/ObjectStore.h"
33 #include "OSDCap.h"
34
35 #include "auth/KeyRing.h"
36
37 #include "osd/ClassHandler.h"
38
39 #include "include/CompatSet.h"
40 #include "include/common_fwd.h"
41
42 #include "OpRequest.h"
43 #include "Session.h"
44
45 #include "osd/scheduler/OpScheduler.h"
46
47 #include <atomic>
48 #include <map>
49 #include <memory>
50 #include <string>
51
52 #include "include/unordered_map.h"
53
54 #include "common/shared_cache.hpp"
55 #include "common/simple_cache.hpp"
56 #include "common/sharedptr_registry.hpp"
57 #include "common/WeightedPriorityQueue.h"
58 #include "common/PrioritizedQueue.h"
59 #include "messages/MOSDOp.h"
60 #include "common/EventTrace.h"
61 #include "osd/osd_perf_counters.h"
62
63 #define CEPH_OSD_PROTOCOL 10 /* cluster internal */
64
65 /*
66
67 lock ordering for pg map
68
69 PG::lock
70 ShardData::lock
71 OSD::pg_map_lock
72
73 */
74
75 class Messenger;
76 class Message;
77 class MonClient;
78 class ObjectStore;
79 class FuseStore;
80 class OSDMap;
81 class MLog;
82 class Objecter;
83 class KeyStore;
84
85 class Watch;
86 class PrimaryLogPG;
87
88 class TestOpsSocketHook;
89 struct C_FinishSplits;
90 struct C_OpenPGs;
91 class LogChannel;
92 class MOSDOp;
93
94 class MOSDPGCreate2;
95 class MOSDPGQuery;
96 class MOSDPGNotify;
97 class MOSDPGInfo;
98 class MOSDPGRemove;
99 class MOSDForceRecovery;
100 class MMonGetPurgedSnapsReply;
101
102 class OSD;
103
104 class OSDService {
105 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
106 public:
107 OSD *osd;
108 CephContext *cct;
109 ObjectStore::CollectionHandle meta_ch;
110 const int whoami;
111 ObjectStore *&store;
112 LogClient &log_client;
113 LogChannelRef clog;
114 PGRecoveryStats &pg_recovery_stats;
115 private:
116 Messenger *&cluster_messenger;
117 Messenger *&client_messenger;
118 public:
119 PerfCounters *&logger;
120 PerfCounters *&recoverystate_perf;
121 MonClient *&monc;
122
123 md_config_cacher_t<Option::size_t> osd_max_object_size;
124 md_config_cacher_t<bool> osd_skip_data_digest;
125
126 void enqueue_back(OpSchedulerItem&& qi);
127 void enqueue_front(OpSchedulerItem&& qi);
128
129 void maybe_inject_dispatch_delay() {
130 if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) {
131 if (rand() % 10000 <
132 g_conf()->osd_debug_inject_dispatch_delay_probability * 10000) {
133 utime_t t;
134 t.set_from_double(g_conf()->osd_debug_inject_dispatch_delay_duration);
135 t.sleep();
136 }
137 }
138 }
139
140 ceph::signedspan get_mnow();
141
142 private:
143 // -- superblock --
144 ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
145 OSDSuperblock superblock;
146
147 public:
148 OSDSuperblock get_superblock() {
149 std::lock_guard l(publish_lock);
150 return superblock;
151 }
152 void publish_superblock(const OSDSuperblock &block) {
153 std::lock_guard l(publish_lock);
154 superblock = block;
155 }
156
157 int get_nodeid() const { return whoami; }
158
159 std::atomic<epoch_t> max_oldest_map;
160 private:
161 OSDMapRef osdmap;
162
163 public:
164 OSDMapRef get_osdmap() {
165 std::lock_guard l(publish_lock);
166 return osdmap;
167 }
168 epoch_t get_osdmap_epoch() {
169 std::lock_guard l(publish_lock);
170 return osdmap ? osdmap->get_epoch() : 0;
171 }
172 void publish_map(OSDMapRef map) {
173 std::lock_guard l(publish_lock);
174 osdmap = map;
175 }
176
177 /*
178 * osdmap - current published map
179 * next_osdmap - pre_published map that is about to be published.
180 *
181 * We use the next_osdmap to send messages and initiate connections,
182 * but only if the target is the same instance as the one in the map
183 * epoch the current user is working from (i.e., the result is
184 * equivalent to what is in next_osdmap).
185 *
186 * This allows the helpers to start ignoring osds that are about to
187 * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
188 * down, without worrying about reopening connections from threads
189 * working from old maps.
190 */
191 private:
192 OSDMapRef next_osdmap;
193 ceph::condition_variable pre_publish_cond;
194 int pre_publish_waiter = 0;
195
196 public:
197 void pre_publish_map(OSDMapRef map) {
198 std::lock_guard l(pre_publish_lock);
199 next_osdmap = std::move(map);
200 }
201
202 void activate_map();
203 /// map epochs reserved below
204 map<epoch_t, unsigned> map_reservations;
205
206 /// gets ref to next_osdmap and registers the epoch as reserved
207 OSDMapRef get_nextmap_reserved() {
208 std::lock_guard l(pre_publish_lock);
209 if (!next_osdmap)
210 return OSDMapRef();
211 epoch_t e = next_osdmap->get_epoch();
212 map<epoch_t, unsigned>::iterator i =
213 map_reservations.insert(make_pair(e, 0)).first;
214 i->second++;
215 return next_osdmap;
216 }
217 /// releases reservation on map
218 void release_map(OSDMapRef osdmap) {
219 std::lock_guard l(pre_publish_lock);
220 map<epoch_t, unsigned>::iterator i =
221 map_reservations.find(osdmap->get_epoch());
222 ceph_assert(i != map_reservations.end());
223 ceph_assert(i->second > 0);
224 if (--(i->second) == 0) {
225 map_reservations.erase(i);
226 }
227 if (pre_publish_waiter) {
228 pre_publish_cond.notify_all();
229 }
230 }
231 /// blocks until there are no reserved maps prior to next_osdmap
232 void await_reserved_maps() {
233 std::unique_lock l{pre_publish_lock};
234 ceph_assert(next_osdmap);
235 pre_publish_waiter++;
236 pre_publish_cond.wait(l, [this] {
237 auto i = map_reservations.cbegin();
238 return (i == map_reservations.cend() ||
239 i->first >= next_osdmap->get_epoch());
240 });
241 pre_publish_waiter--;
242 }
243 OSDMapRef get_next_osdmap() {
244 std::lock_guard l(pre_publish_lock);
245 if (!next_osdmap)
246 return OSDMapRef();
247 return next_osdmap;
248 }
249
250 void maybe_share_map(Connection *con,
251 const OSDMapRef& osdmap,
252 epoch_t peer_epoch_lb=0);
253
254 void send_map(class MOSDMap *m, Connection *con);
255 void send_incremental_map(epoch_t since, Connection *con,
256 const OSDMapRef& osdmap);
257 MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
258 OSDSuperblock& superblock);
259
260 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
261 pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
262 void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
263 void send_message_osd_cluster(std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch);
264 void send_message_osd_cluster(Message *m, Connection *con) {
265 con->send_message(m);
266 }
267 void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
268 con->send_message(m);
269 }
270 void send_message_osd_client(Message *m, const ConnectionRef& con) {
271 con->send_message(m);
272 }
273 entity_name_t get_cluster_msgr_name() const;
274
275 private:
276 // -- scrub scheduling --
277 ceph::mutex sched_scrub_lock = ceph::make_mutex("OSDService::sched_scrub_lock");
278 int scrubs_local;
279 int scrubs_remote;
280
281 public:
282 struct ScrubJob {
283 CephContext* cct;
284 /// pg to be scrubbed
285 spg_t pgid;
286 /// a time scheduled for scrub. but the scrub could be delayed if system
287 /// load is too high or it fails to fall in the scrub hours
288 utime_t sched_time;
289 /// the hard upper bound of scrub time
290 utime_t deadline;
291 ScrubJob() : cct(nullptr) {}
292 explicit ScrubJob(CephContext* cct, const spg_t& pg,
293 const utime_t& timestamp,
294 double pool_scrub_min_interval = 0,
295 double pool_scrub_max_interval = 0, bool must = true);
296 /// order the jobs by sched_time
297 bool operator<(const ScrubJob& rhs) const;
298 };
299 set<ScrubJob> sched_scrub_pg;
300
301 /// @returns the scrub_reg_stamp used for unregister the scrub job
302 utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval,
303 double pool_scrub_max_interval, bool must) {
304 ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
305 must);
306 std::lock_guard l(sched_scrub_lock);
307 sched_scrub_pg.insert(scrub);
308 return scrub.sched_time;
309 }
310 void unreg_pg_scrub(spg_t pgid, utime_t t) {
311 std::lock_guard l(sched_scrub_lock);
312 size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
313 ceph_assert(removed);
314 }
315 bool first_scrub_stamp(ScrubJob *out) {
316 std::lock_guard l(sched_scrub_lock);
317 if (sched_scrub_pg.empty())
318 return false;
319 set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
320 *out = *iter;
321 return true;
322 }
323 bool next_scrub_stamp(const ScrubJob& next,
324 ScrubJob *out) {
325 std::lock_guard l(sched_scrub_lock);
326 if (sched_scrub_pg.empty())
327 return false;
328 set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next);
329 if (iter == sched_scrub_pg.cend())
330 return false;
331 ++iter;
332 if (iter == sched_scrub_pg.cend())
333 return false;
334 *out = *iter;
335 return true;
336 }
337
338 void dumps_scrub(Formatter *f) {
339 ceph_assert(f != nullptr);
340 std::lock_guard l(sched_scrub_lock);
341
342 f->open_array_section("scrubs");
343 for (const auto &i: sched_scrub_pg) {
344 f->open_object_section("scrub");
345 f->dump_stream("pgid") << i.pgid;
346 f->dump_stream("sched_time") << i.sched_time;
347 f->dump_stream("deadline") << i.deadline;
348 f->dump_bool("forced", i.sched_time == PG::Scrubber::scrub_must_stamp());
349 f->close_section();
350 }
351 f->close_section();
352 }
353
354 bool can_inc_scrubs();
355 bool inc_scrubs_local();
356 void dec_scrubs_local();
357 bool inc_scrubs_remote();
358 void dec_scrubs_remote();
359 void dump_scrub_reservations(Formatter *f);
360
361 void reply_op_error(OpRequestRef op, int err);
362 void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv,
363 vector<pg_log_op_return_item_t> op_returns);
364 void handle_misdirected_op(PG *pg, OpRequestRef op);
365
366
367 private:
368 // -- agent shared state --
369 ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock");
370 ceph::condition_variable agent_cond;
371 map<uint64_t, set<PGRef> > agent_queue;
372 set<PGRef>::iterator agent_queue_pos;
373 bool agent_valid_iterator;
374 int agent_ops;
375 int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
376 set<hobject_t> agent_oids;
377 bool agent_active;
378 struct AgentThread : public Thread {
379 OSDService *osd;
380 explicit AgentThread(OSDService *o) : osd(o) {}
381 void *entry() override {
382 osd->agent_entry();
383 return NULL;
384 }
385 } agent_thread;
386 bool agent_stop_flag;
387 ceph::mutex agent_timer_lock = ceph::make_mutex("OSDService::agent_timer_lock");
388 SafeTimer agent_timer;
389
390 public:
391 void agent_entry();
392 void agent_stop();
393
394 void _enqueue(PG *pg, uint64_t priority) {
395 if (!agent_queue.empty() &&
396 agent_queue.rbegin()->first < priority)
397 agent_valid_iterator = false; // inserting higher-priority queue
398 set<PGRef>& nq = agent_queue[priority];
399 if (nq.empty())
400 agent_cond.notify_all();
401 nq.insert(pg);
402 }
403
404 void _dequeue(PG *pg, uint64_t old_priority) {
405 set<PGRef>& oq = agent_queue[old_priority];
406 set<PGRef>::iterator p = oq.find(pg);
407 ceph_assert(p != oq.end());
408 if (p == agent_queue_pos)
409 ++agent_queue_pos;
410 oq.erase(p);
411 if (oq.empty()) {
412 if (agent_queue.rbegin()->first == old_priority)
413 agent_valid_iterator = false;
414 agent_queue.erase(old_priority);
415 }
416 }
417
418 /// enable agent for a pg
419 void agent_enable_pg(PG *pg, uint64_t priority) {
420 std::lock_guard l(agent_lock);
421 _enqueue(pg, priority);
422 }
423
424 /// adjust priority for an enagled pg
425 void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
426 std::lock_guard l(agent_lock);
427 ceph_assert(new_priority != old_priority);
428 _enqueue(pg, new_priority);
429 _dequeue(pg, old_priority);
430 }
431
432 /// disable agent for a pg
433 void agent_disable_pg(PG *pg, uint64_t old_priority) {
434 std::lock_guard l(agent_lock);
435 _dequeue(pg, old_priority);
436 }
437
438 /// note start of an async (evict) op
439 void agent_start_evict_op() {
440 std::lock_guard l(agent_lock);
441 ++agent_ops;
442 }
443
444 /// note finish or cancellation of an async (evict) op
445 void agent_finish_evict_op() {
446 std::lock_guard l(agent_lock);
447 ceph_assert(agent_ops > 0);
448 --agent_ops;
449 agent_cond.notify_all();
450 }
451
452 /// note start of an async (flush) op
453 void agent_start_op(const hobject_t& oid) {
454 std::lock_guard l(agent_lock);
455 ++agent_ops;
456 ceph_assert(agent_oids.count(oid) == 0);
457 agent_oids.insert(oid);
458 }
459
460 /// note finish or cancellation of an async (flush) op
461 void agent_finish_op(const hobject_t& oid) {
462 std::lock_guard l(agent_lock);
463 ceph_assert(agent_ops > 0);
464 --agent_ops;
465 ceph_assert(agent_oids.count(oid) == 1);
466 agent_oids.erase(oid);
467 agent_cond.notify_all();
468 }
469
470 /// check if we are operating on an object
471 bool agent_is_active_oid(const hobject_t& oid) {
472 std::lock_guard l(agent_lock);
473 return agent_oids.count(oid);
474 }
475
476 /// get count of active agent ops
477 int agent_get_num_ops() {
478 std::lock_guard l(agent_lock);
479 return agent_ops;
480 }
481
482 void agent_inc_high_count() {
483 std::lock_guard l(agent_lock);
484 flush_mode_high_count ++;
485 }
486
487 void agent_dec_high_count() {
488 std::lock_guard l(agent_lock);
489 flush_mode_high_count --;
490 }
491
492 private:
493 /// throttle promotion attempts
494 std::atomic<unsigned int> promote_probability_millis{1000}; ///< probability thousands. one word.
495 PromoteCounter promote_counter;
496 utime_t last_recalibrate;
497 unsigned long promote_max_objects, promote_max_bytes;
498
499 public:
500 bool promote_throttle() {
501 // NOTE: lockless! we rely on the probability being a single word.
502 promote_counter.attempt();
503 if ((unsigned)rand() % 1000 > promote_probability_millis)
504 return true; // yes throttle (no promote)
505 if (promote_max_objects &&
506 promote_counter.objects > promote_max_objects)
507 return true; // yes throttle
508 if (promote_max_bytes &&
509 promote_counter.bytes > promote_max_bytes)
510 return true; // yes throttle
511 return false; // no throttle (promote)
512 }
513 void promote_finish(uint64_t bytes) {
514 promote_counter.finish(bytes);
515 }
516 void promote_throttle_recalibrate();
517 unsigned get_num_shards() const {
518 return m_objecter_finishers;
519 }
520 Finisher* get_objecter_finisher(int shard) {
521 return objecter_finishers[shard].get();
522 }
523
524 // -- Objecter, for tiering reads/writes from/to other OSDs --
525 std::unique_ptr<Objecter> objecter;
526 int m_objecter_finishers;
527 std::vector<std::unique_ptr<Finisher>> objecter_finishers;
528
529 // -- Watch --
530 ceph::mutex watch_lock = ceph::make_mutex("OSDService::watch_lock");
531 SafeTimer watch_timer;
532 uint64_t next_notif_id;
533 uint64_t get_next_id(epoch_t cur_epoch) {
534 std::lock_guard l(watch_lock);
535 return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
536 }
537
538 // -- Recovery/Backfill Request Scheduling --
539 ceph::mutex recovery_request_lock = ceph::make_mutex("OSDService::recovery_request_lock");
540 SafeTimer recovery_request_timer;
541
542 // For async recovery sleep
543 bool recovery_needs_sleep = true;
544 ceph::real_clock::time_point recovery_schedule_time;
545
546 // For recovery & scrub & snap
547 ceph::mutex sleep_lock = ceph::make_mutex("OSDService::sleep_lock");
548 SafeTimer sleep_timer;
549
550 // -- tids --
551 // for ops i issue
552 std::atomic<unsigned int> last_tid{0};
553 ceph_tid_t get_tid() {
554 return (ceph_tid_t)last_tid++;
555 }
556
557 // -- backfill_reservation --
558 Finisher reserver_finisher;
559 AsyncReserver<spg_t> local_reserver;
560 AsyncReserver<spg_t> remote_reserver;
561
562 // -- pg merge --
563 ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
564 map<pg_t,eversion_t> ready_to_merge_source; // pg -> version
565 map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target; // pg -> (version,les,lec)
566 set<pg_t> not_ready_to_merge_source;
567 map<pg_t,pg_t> not_ready_to_merge_target;
568 set<pg_t> sent_ready_to_merge_source;
569
570 void set_ready_to_merge_source(PG *pg,
571 eversion_t version);
572 void set_ready_to_merge_target(PG *pg,
573 eversion_t version,
574 epoch_t last_epoch_started,
575 epoch_t last_epoch_clean);
576 void set_not_ready_to_merge_source(pg_t source);
577 void set_not_ready_to_merge_target(pg_t target, pg_t source);
578 void clear_ready_to_merge(PG *pg);
579 void send_ready_to_merge();
580 void _send_ready_to_merge();
581 void clear_sent_ready_to_merge();
582 void prune_sent_ready_to_merge(const OSDMapRef& osdmap);
583
584 // -- pg_temp --
585 private:
586 ceph::mutex pg_temp_lock = ceph::make_mutex("OSDService::pg_temp_lock");
587 struct pg_temp_t {
588 vector<int> acting;
589 bool forced = false;
590 };
591 map<pg_t, pg_temp_t> pg_temp_wanted;
592 map<pg_t, pg_temp_t> pg_temp_pending;
593 void _sent_pg_temp();
594 friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
595 public:
596 void queue_want_pg_temp(pg_t pgid, const vector<int>& want,
597 bool forced = false);
598 void remove_want_pg_temp(pg_t pgid);
599 void requeue_pg_temp();
600 void send_pg_temp();
601
602 ceph::mutex pg_created_lock = ceph::make_mutex("OSDService::pg_created_lock");
603 set<pg_t> pg_created;
604 void send_pg_created(pg_t pgid);
605 void prune_pg_created();
606 void send_pg_created();
607
608 AsyncReserver<spg_t> snap_reserver;
609 void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c);
610 void queue_for_snap_trim(PG *pg);
611 void queue_for_scrub(PG *pg, bool with_high_priority);
612 void queue_for_pg_delete(spg_t pgid, epoch_t e);
613 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
614
615 private:
616 // -- pg recovery and associated throttling --
617 ceph::mutex recovery_lock = ceph::make_mutex("OSDService::recovery_lock");
618 list<pair<epoch_t, PGRef> > awaiting_throttle;
619
620 utime_t defer_recovery_until;
621 uint64_t recovery_ops_active;
622 uint64_t recovery_ops_reserved;
623 bool recovery_paused;
624 #ifdef DEBUG_RECOVERY_OIDS
625 map<spg_t, set<hobject_t> > recovery_oids;
626 #endif
627 bool _recover_now(uint64_t *available_pushes);
628 void _maybe_queue_recovery();
629 void _queue_for_recovery(
630 pair<epoch_t, PGRef> p, uint64_t reserved_pushes);
631 public:
632 void start_recovery_op(PG *pg, const hobject_t& soid);
633 void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
634 bool is_recovery_active();
635 void release_reserved_pushes(uint64_t pushes);
636 void defer_recovery(float defer_for) {
637 defer_recovery_until = ceph_clock_now();
638 defer_recovery_until += defer_for;
639 }
640 void pause_recovery() {
641 std::lock_guard l(recovery_lock);
642 recovery_paused = true;
643 }
644 bool recovery_is_paused() {
645 std::lock_guard l(recovery_lock);
646 return recovery_paused;
647 }
648 void unpause_recovery() {
649 std::lock_guard l(recovery_lock);
650 recovery_paused = false;
651 _maybe_queue_recovery();
652 }
653 void kick_recovery_queue() {
654 std::lock_guard l(recovery_lock);
655 _maybe_queue_recovery();
656 }
657 void clear_queued_recovery(PG *pg) {
658 std::lock_guard l(recovery_lock);
659 awaiting_throttle.remove_if(
660 [pg](decltype(awaiting_throttle)::const_reference awaiting ) {
661 return awaiting.second.get() == pg;
662 });
663 }
664
665 unsigned get_target_pg_log_entries() const;
666
667 // delayed pg activation
668 void queue_for_recovery(PG *pg) {
669 std::lock_guard l(recovery_lock);
670
671 if (pg->is_forced_recovery_or_backfill()) {
672 awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
673 } else {
674 awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
675 }
676 _maybe_queue_recovery();
677 }
678 void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
679 std::lock_guard l(recovery_lock);
680 _queue_for_recovery(make_pair(queued, pg), reserved_pushes);
681 }
682
683 void queue_check_readable(spg_t spgid,
684 epoch_t lpr,
685 ceph::signedspan delay = ceph::signedspan::zero());
686
687 // osd map cache (past osd maps)
688 ceph::mutex map_cache_lock = ceph::make_mutex("OSDService::map_cache_lock");
689 SharedLRU<epoch_t, const OSDMap> map_cache;
690 SimpleLRU<epoch_t, bufferlist> map_bl_cache;
691 SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
692
693 OSDMapRef try_get_map(epoch_t e);
694 OSDMapRef get_map(epoch_t e) {
695 OSDMapRef ret(try_get_map(e));
696 ceph_assert(ret);
697 return ret;
698 }
699 OSDMapRef add_map(OSDMap *o) {
700 std::lock_guard l(map_cache_lock);
701 return _add_map(o);
702 }
703 OSDMapRef _add_map(OSDMap *o);
704
705 void _add_map_bl(epoch_t e, bufferlist& bl);
706 bool get_map_bl(epoch_t e, bufferlist& bl) {
707 std::lock_guard l(map_cache_lock);
708 return _get_map_bl(e, bl);
709 }
710 bool _get_map_bl(epoch_t e, bufferlist& bl);
711
712 void _add_map_inc_bl(epoch_t e, bufferlist& bl);
713 bool get_inc_map_bl(epoch_t e, bufferlist& bl);
714
715 /// identify split child pgids over a osdmap interval
716 void identify_splits_and_merges(
717 OSDMapRef old_map,
718 OSDMapRef new_map,
719 spg_t pgid,
720 set<pair<spg_t,epoch_t>> *new_children,
721 set<pair<spg_t,epoch_t>> *merge_pgs);
722
723 void need_heartbeat_peer_update();
724
725 void init();
726 void final_init();
727 void start_shutdown();
728 void shutdown_reserver();
729 void shutdown();
730
731 // -- stats --
732 ceph::mutex stat_lock = ceph::make_mutex("OSDService::stat_lock");
733 osd_stat_t osd_stat;
734 uint32_t seq = 0;
735
736 void set_statfs(const struct store_statfs_t &stbuf,
737 osd_alert_list_t& alerts);
738 osd_stat_t set_osd_stat(vector<int>& hb_peers, int num_pgs);
739 void inc_osd_stat_repaired(void);
740 float compute_adjusted_ratio(osd_stat_t new_stat, float *pratio, uint64_t adjust_used = 0);
741 osd_stat_t get_osd_stat() {
742 std::lock_guard l(stat_lock);
743 ++seq;
744 osd_stat.up_from = up_epoch;
745 osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
746 return osd_stat;
747 }
748 uint64_t get_osd_stat_seq() {
749 std::lock_guard l(stat_lock);
750 return osd_stat.seq;
751 }
752 void get_hb_pingtime(map<int, osd_stat_t::Interfaces> *pp)
753 {
754 std::lock_guard l(stat_lock);
755 *pp = osd_stat.hb_pingtime;
756 return;
757 }
758
759 // -- OSD Full Status --
760 private:
761 friend TestOpsSocketHook;
762 mutable ceph::mutex full_status_lock = ceph::make_mutex("OSDService::full_status_lock");
763 enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
764 const char *get_full_state_name(s_names s) const {
765 switch (s) {
766 case NONE: return "none";
767 case NEARFULL: return "nearfull";
768 case BACKFILLFULL: return "backfillfull";
769 case FULL: return "full";
770 case FAILSAFE: return "failsafe";
771 default: return "???";
772 }
773 }
774 s_names get_full_state(string type) const {
775 if (type == "none")
776 return NONE;
777 else if (type == "failsafe")
778 return FAILSAFE;
779 else if (type == "full")
780 return FULL;
781 else if (type == "backfillfull")
782 return BACKFILLFULL;
783 else if (type == "nearfull")
784 return NEARFULL;
785 else
786 return INVALID;
787 }
788 double cur_ratio, physical_ratio; ///< current utilization
789 mutable int64_t injectfull = 0;
790 s_names injectfull_state = NONE;
791 float get_failsafe_full_ratio();
792 bool _check_inject_full(DoutPrefixProvider *dpp, s_names type) const;
793 bool _check_full(DoutPrefixProvider *dpp, s_names type) const;
794 public:
795 void check_full_status(float ratio, float pratio);
796 s_names recalc_full_state(float ratio, float pratio, string &inject);
797 bool _tentative_full(DoutPrefixProvider *dpp, s_names type, uint64_t adjust_used, osd_stat_t);
798 bool check_failsafe_full(DoutPrefixProvider *dpp) const;
799 bool check_full(DoutPrefixProvider *dpp) const;
800 bool tentative_backfill_full(DoutPrefixProvider *dpp, uint64_t adjust_used, osd_stat_t);
801 bool check_backfill_full(DoutPrefixProvider *dpp) const;
802 bool check_nearfull(DoutPrefixProvider *dpp) const;
803 bool is_failsafe_full() const;
804 bool is_full() const;
805 bool is_backfillfull() const;
806 bool is_nearfull() const;
807 bool need_fullness_update(); ///< osdmap state needs update
808 void set_injectfull(s_names type, int64_t count);
809
810
811 // -- epochs --
812 private:
813 // protects access to boot_epoch, up_epoch, bind_epoch
814 mutable ceph::mutex epoch_lock = ceph::make_mutex("OSDService::epoch_lock");
815 epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
816 epoch_t up_epoch; // _most_recent_ epoch we were marked up
817 epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
818 public:
819 /**
820 * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params
821 * can be NULL if you don't care about them.
822 */
823 void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
824 epoch_t *_bind_epoch) const;
825 /**
826 * Set the boot, up, and bind epochs. Any NULL params will not be set.
827 */
828 void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
829 const epoch_t *_bind_epoch);
830 epoch_t get_boot_epoch() const {
831 epoch_t ret;
832 retrieve_epochs(&ret, NULL, NULL);
833 return ret;
834 }
835 epoch_t get_up_epoch() const {
836 epoch_t ret;
837 retrieve_epochs(NULL, &ret, NULL);
838 return ret;
839 }
840 epoch_t get_bind_epoch() const {
841 epoch_t ret;
842 retrieve_epochs(NULL, NULL, &ret);
843 return ret;
844 }
845
846 void request_osdmap_update(epoch_t e);
847
848 // -- heartbeats --
849 ceph::mutex hb_stamp_lock = ceph::make_mutex("OSDServce::hb_stamp_lock");
850
851 /// osd -> heartbeat stamps
852 vector<HeartbeatStampsRef> hb_stamps;
853
854 /// get or create a ref for a peer's HeartbeatStamps
855 HeartbeatStampsRef get_hb_stamps(unsigned osd);
856
857
858 // Timer for readable leases
859 ceph::timer<ceph::mono_clock> mono_timer = ceph::timer<ceph::mono_clock>{ceph::construct_suspended};
860
861 void queue_renew_lease(epoch_t epoch, spg_t spgid);
862
863 // -- stopping --
864 ceph::mutex is_stopping_lock = ceph::make_mutex("OSDService::is_stopping_lock");
865 ceph::condition_variable is_stopping_cond;
866 enum {
867 NOT_STOPPING,
868 PREPARING_TO_STOP,
869 STOPPING };
870 std::atomic<int> state{NOT_STOPPING};
871 int get_state() const {
872 return state;
873 }
874 void set_state(int s) {
875 state = s;
876 }
877 bool is_stopping() const {
878 return state == STOPPING;
879 }
880 bool is_preparing_to_stop() const {
881 return state == PREPARING_TO_STOP;
882 }
883 bool prepare_to_stop();
884 void got_stop_ack();
885
886
887 #ifdef PG_DEBUG_REFS
888 ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock");
889 map<spg_t, int> pgid_tracker;
890 map<spg_t, PG*> live_pgs;
891 void add_pgid(spg_t pgid, PG *pg);
892 void remove_pgid(spg_t pgid, PG *pg);
893 void dump_live_pgids();
894 #endif
895
896 explicit OSDService(OSD *osd);
897 ~OSDService() = default;
898 };
899
900 /*
901
902 Each PG slot includes queues for events that are processing and/or waiting
903 for a PG to be materialized in the slot.
904
905 These are the constraints:
906
907 - client ops must remained ordered by client, regardless of map epoch
908 - peering messages/events from peers must remain ordered by peer
909 - peering messages and client ops need not be ordered relative to each other
910
911 - some peering events can create a pg (e.g., notify)
912 - the query peering event can proceed when a PG doesn't exist
913
914 Implementation notes:
915
916 - everybody waits for split. If the OSD has the parent PG it will instantiate
917 the PGSlot early and mark it waiting_for_split. Everything will wait until
918 the parent is able to commit the split operation and the child PG's are
919 materialized in the child slots.
920
921 - every event has an epoch property and will wait for the OSDShard to catch
922 up to that epoch. For example, if we get a peering event from a future
923 epoch, the event will wait in the slot until the local OSD has caught up.
924 (We should be judicious in specifying the required epoch [by, e.g., setting
925 it to the same_interval_since epoch] so that we don't wait for epochs that
926 don't affect the given PG.)
927
928 - we maintain two separate wait lists, *waiting* and *waiting_peering*. The
929 OpSchedulerItem has an is_peering() bool to determine which we use. Waiting
930 peering events are queued up by epoch required.
931
932 - when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or
933 materialized the PG), we wake *all* waiting items. (This could be optimized,
934 probably, but we don't bother.) We always requeue peering items ahead of
935 client ops.
936
937 - some peering events are marked !peering_requires_pg (PGQuery). if we do
938 not have a PG these are processed immediately (under the shard lock).
939
940 - we do not have a PG present, we check if the slot maps to the current host.
941 if so, we either queue the item and wait for the PG to materialize, or
942 (if the event is a pg creating event like PGNotify), we materialize the PG.
943
944 - when we advance the osdmap on the OSDShard, we scan pg slots and
945 discard any slots with no pg (and not waiting_for_split) that no
946 longer map to the current host.
947
948 */
949
950 struct OSDShardPGSlot {
951 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
952 PGRef pg; ///< pg reference
953 deque<OpSchedulerItem> to_process; ///< order items for this slot
954 int num_running = 0; ///< _process threads doing pg lookup/lock
955
956 deque<OpSchedulerItem> waiting; ///< waiting for pg (or map + pg)
957
958 /// waiting for map (peering evt)
959 map<epoch_t,deque<OpSchedulerItem>> waiting_peering;
960
961 /// incremented by wake_pg_waiters; indicates racing _process threads
962 /// should bail out (their op has been requeued)
963 uint64_t requeue_seq = 0;
964
965 /// waiting for split child to materialize in these epoch(s)
966 set<epoch_t> waiting_for_split;
967
968 epoch_t epoch = 0;
969 boost::intrusive::set_member_hook<> pg_epoch_item;
970
971 /// waiting for a merge (source or target) by this epoch
972 epoch_t waiting_for_merge_epoch = 0;
973 };
974
975 struct OSDShard {
976 const unsigned shard_id;
977 CephContext *cct;
978 OSD *osd;
979
980 string shard_name;
981
982 string sdata_wait_lock_name;
983 ceph::mutex sdata_wait_lock;
984 ceph::condition_variable sdata_cond;
985
986 ceph::mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock
987 OSDMapRef shard_osdmap;
988
989 OSDMapRef get_osdmap() {
990 std::lock_guard l(osdmap_lock);
991 return shard_osdmap;
992 }
993
994 string shard_lock_name;
995 ceph::mutex shard_lock; ///< protects remaining members below
996
997 /// map of slots for each spg_t. maintains ordering of items dequeued
998 /// from scheduler while _process thread drops shard lock to acquire the
999 /// pg lock. stale slots are removed by consume_map.
1000 unordered_map<spg_t,unique_ptr<OSDShardPGSlot>> pg_slots;
1001
1002 struct pg_slot_compare_by_epoch {
1003 bool operator()(const OSDShardPGSlot& l, const OSDShardPGSlot& r) const {
1004 return l.epoch < r.epoch;
1005 }
1006 };
1007
1008 /// maintain an ordering of pg slots by pg epoch
1009 boost::intrusive::multiset<
1010 OSDShardPGSlot,
1011 boost::intrusive::member_hook<
1012 OSDShardPGSlot,
1013 boost::intrusive::set_member_hook<>,
1014 &OSDShardPGSlot::pg_epoch_item>,
1015 boost::intrusive::compare<pg_slot_compare_by_epoch>> pg_slots_by_epoch;
1016 int waiting_for_min_pg_epoch = 0;
1017 ceph::condition_variable min_pg_epoch_cond;
1018
1019 /// priority queue
1020 ceph::osd::scheduler::OpSchedulerRef scheduler;
1021
1022 bool stop_waiting = false;
1023
1024 ContextQueue context_queue;
1025
1026 void _attach_pg(OSDShardPGSlot *slot, PG *pg);
1027 void _detach_pg(OSDShardPGSlot *slot);
1028
1029 void update_pg_epoch(OSDShardPGSlot *slot, epoch_t epoch);
1030 epoch_t get_min_pg_epoch();
1031 void wait_min_pg_epoch(epoch_t need);
1032
1033 /// return newest epoch we are waiting for
1034 epoch_t get_max_waiting_epoch();
1035
1036 /// push osdmap into shard
1037 void consume_map(
1038 const OSDMapRef& osdmap,
1039 unsigned *pushes_to_free);
1040
1041 void _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
1042
1043 void identify_splits_and_merges(
1044 const OSDMapRef& as_of_osdmap,
1045 set<pair<spg_t,epoch_t>> *split_children,
1046 set<pair<spg_t,epoch_t>> *merge_pgs);
1047 void _prime_splits(set<pair<spg_t,epoch_t>> *pgids);
1048 void prime_splits(const OSDMapRef& as_of_osdmap,
1049 set<pair<spg_t,epoch_t>> *pgids);
1050 void prime_merges(const OSDMapRef& as_of_osdmap,
1051 set<pair<spg_t,epoch_t>> *merge_pgs);
1052 void register_and_wake_split_child(PG *pg);
1053 void unprime_split_children(spg_t parent, unsigned old_pg_num);
1054
1055 OSDShard(
1056 int id,
1057 CephContext *cct,
1058 OSD *osd);
1059 };
1060
1061 class OSD : public Dispatcher,
1062 public md_config_obs_t {
1063 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
1064
1065 /** OSD **/
1066 // global lock
1067 ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock");
1068 SafeTimer tick_timer; // safe timer (osd_lock)
1069
1070 // Tick timer for those stuff that do not need osd_lock
1071 ceph::mutex tick_timer_lock = ceph::make_mutex("OSD::tick_timer_lock");
1072 SafeTimer tick_timer_without_osd_lock;
1073 std::string gss_ktfile_client{};
1074
1075 public:
1076 // config observer bits
1077 const char** get_tracked_conf_keys() const override;
1078 void handle_conf_change(const ConfigProxy& conf,
1079 const std::set <std::string> &changed) override;
1080 void update_log_config();
1081 void check_config();
1082
1083 protected:
1084
1085 const double OSD_TICK_INTERVAL = { 1.0 };
1086 double get_tick_interval() const;
1087
1088 Messenger *cluster_messenger;
1089 Messenger *client_messenger;
1090 Messenger *objecter_messenger;
1091 MonClient *monc; // check the "monc helpers" list before accessing directly
1092 MgrClient mgrc;
1093 PerfCounters *logger;
1094 PerfCounters *recoverystate_perf;
1095 ObjectStore *store;
1096 #ifdef HAVE_LIBFUSE
1097 FuseStore *fuse_store = nullptr;
1098 #endif
1099 LogClient log_client;
1100 LogChannelRef clog;
1101
1102 int whoami;
1103 std::string dev_path, journal_path;
1104
1105 ceph_release_t last_require_osd_release{ceph_release_t::unknown};
1106
1107 int numa_node = -1;
1108 size_t numa_cpu_set_size = 0;
1109 cpu_set_t numa_cpu_set;
1110
1111 bool store_is_rotational = true;
1112 bool journal_is_rotational = true;
1113
1114 ZTracer::Endpoint trace_endpoint;
1115 void create_logger();
1116 void create_recoverystate_perf();
1117 void tick();
1118 void tick_without_osd_lock();
1119 void _dispatch(Message *m);
1120 void dispatch_op(OpRequestRef op);
1121
1122 void check_osdmap_features();
1123
1124 // asok
1125 friend class OSDSocketHook;
1126 class OSDSocketHook *asok_hook;
1127 void asok_command(
1128 std::string_view prefix,
1129 const cmdmap_t& cmdmap,
1130 Formatter *f,
1131 const bufferlist& inbl,
1132 std::function<void(int,const std::string&,bufferlist&)> on_finish);
1133
1134 public:
1135 int get_nodeid() { return whoami; }
1136
1137 static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
1138 char foo[20];
1139 snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
1140 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1141 }
1142 static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
1143 char foo[22];
1144 snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch);
1145 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1146 }
1147
1148 static ghobject_t make_snapmapper_oid() {
1149 return ghobject_t(hobject_t(
1150 sobject_t(
1151 object_t("snapmapper"),
1152 0)));
1153 }
1154 static ghobject_t make_purged_snaps_oid() {
1155 return ghobject_t(hobject_t(
1156 sobject_t(
1157 object_t("purged_snaps"),
1158 0)));
1159 }
1160
1161 static ghobject_t make_pg_log_oid(spg_t pg) {
1162 stringstream ss;
1163 ss << "pglog_" << pg;
1164 string s;
1165 getline(ss, s);
1166 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1167 }
1168
1169 static ghobject_t make_pg_biginfo_oid(spg_t pg) {
1170 stringstream ss;
1171 ss << "pginfo_" << pg;
1172 string s;
1173 getline(ss, s);
1174 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1175 }
1176 static ghobject_t make_infos_oid() {
1177 hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
1178 return ghobject_t(oid);
1179 }
1180
1181 static ghobject_t make_final_pool_info_oid(int64_t pool) {
1182 return ghobject_t(
1183 hobject_t(
1184 sobject_t(
1185 object_t(string("final_pool_") + stringify(pool)),
1186 CEPH_NOSNAP)));
1187 }
1188
1189 static ghobject_t make_pg_num_history_oid() {
1190 return ghobject_t(hobject_t(sobject_t("pg_num_history", CEPH_NOSNAP)));
1191 }
1192
1193 static void recursive_remove_collection(CephContext* cct,
1194 ObjectStore *store,
1195 spg_t pgid,
1196 coll_t tmp);
1197
1198 /**
1199 * get_osd_initial_compat_set()
1200 *
1201 * Get the initial feature set for this OSD. Features
1202 * here are automatically upgraded.
1203 *
1204 * Return value: Initial osd CompatSet
1205 */
1206 static CompatSet get_osd_initial_compat_set();
1207
1208 /**
1209 * get_osd_compat_set()
1210 *
1211 * Get all features supported by this OSD
1212 *
1213 * Return value: CompatSet of all supported features
1214 */
1215 static CompatSet get_osd_compat_set();
1216
1217
1218 private:
1219 class C_Tick;
1220 class C_Tick_WithoutOSDLock;
1221
1222 // -- config settings --
1223 float m_osd_pg_epoch_max_lag_factor;
1224
1225 // -- superblock --
1226 OSDSuperblock superblock;
1227
1228 void write_superblock();
1229 void write_superblock(ObjectStore::Transaction& t);
1230 int read_superblock();
1231
1232 void clear_temp_objects();
1233
1234 CompatSet osd_compat;
1235
1236 // -- state --
1237 public:
1238 typedef enum {
1239 STATE_INITIALIZING = 1,
1240 STATE_PREBOOT,
1241 STATE_BOOTING,
1242 STATE_ACTIVE,
1243 STATE_STOPPING,
1244 STATE_WAITING_FOR_HEALTHY
1245 } osd_state_t;
1246
1247 static const char *get_state_name(int s) {
1248 switch (s) {
1249 case STATE_INITIALIZING: return "initializing";
1250 case STATE_PREBOOT: return "preboot";
1251 case STATE_BOOTING: return "booting";
1252 case STATE_ACTIVE: return "active";
1253 case STATE_STOPPING: return "stopping";
1254 case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy";
1255 default: return "???";
1256 }
1257 }
1258
1259 private:
1260 std::atomic<int> state{STATE_INITIALIZING};
1261
1262 public:
1263 int get_state() const {
1264 return state;
1265 }
1266 void set_state(int s) {
1267 state = s;
1268 }
1269 bool is_initializing() const {
1270 return state == STATE_INITIALIZING;
1271 }
1272 bool is_preboot() const {
1273 return state == STATE_PREBOOT;
1274 }
1275 bool is_booting() const {
1276 return state == STATE_BOOTING;
1277 }
1278 bool is_active() const {
1279 return state == STATE_ACTIVE;
1280 }
1281 bool is_stopping() const {
1282 return state == STATE_STOPPING;
1283 }
1284 bool is_waiting_for_healthy() const {
1285 return state == STATE_WAITING_FOR_HEALTHY;
1286 }
1287
1288 private:
1289
1290 ShardedThreadPool osd_op_tp;
1291
1292 void get_latest_osdmap();
1293
1294 // -- sessions --
1295 private:
1296 void dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap);
1297
1298 ceph::mutex session_waiting_lock = ceph::make_mutex("OSD::session_waiting_lock");
1299 set<ceph::ref_t<Session>> session_waiting_for_map;
1300
1301 /// Caller assumes refs for included Sessions
1302 void get_sessions_waiting_for_map(set<ceph::ref_t<Session>> *out) {
1303 std::lock_guard l(session_waiting_lock);
1304 out->swap(session_waiting_for_map);
1305 }
1306 void register_session_waiting_on_map(const ceph::ref_t<Session>& session) {
1307 std::lock_guard l(session_waiting_lock);
1308 session_waiting_for_map.insert(session);
1309 }
1310 void clear_session_waiting_on_map(const ceph::ref_t<Session>& session) {
1311 std::lock_guard l(session_waiting_lock);
1312 session_waiting_for_map.erase(session);
1313 }
1314 void dispatch_sessions_waiting_on_map() {
1315 set<ceph::ref_t<Session>> sessions_to_check;
1316 get_sessions_waiting_for_map(&sessions_to_check);
1317 for (auto i = sessions_to_check.begin();
1318 i != sessions_to_check.end();
1319 sessions_to_check.erase(i++)) {
1320 std::lock_guard l{(*i)->session_dispatch_lock};
1321 dispatch_session_waiting(*i, get_osdmap());
1322 }
1323 }
1324 void session_handle_reset(const ceph::ref_t<Session>& session) {
1325 std::lock_guard l(session->session_dispatch_lock);
1326 clear_session_waiting_on_map(session);
1327
1328 session->clear_backoffs();
1329
1330 /* Messages have connection refs, we need to clear the
1331 * connection->session->message->connection
1332 * cycles which result.
1333 * Bug #12338
1334 */
1335 session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
1336 }
1337
1338 private:
1339 /**
1340 * @defgroup monc helpers
1341 * @{
1342 * Right now we only have the one
1343 */
1344
1345 /**
1346 * Ask the Monitors for a sequence of OSDMaps.
1347 *
1348 * @param epoch The epoch to start with when replying
1349 * @param force_request True if this request forces a new subscription to
1350 * the monitors; false if an outstanding request that encompasses it is
1351 * sufficient.
1352 */
1353 void osdmap_subscribe(version_t epoch, bool force_request);
1354 /** @} monc helpers */
1355
1356 ceph::mutex osdmap_subscribe_lock = ceph::make_mutex("OSD::osdmap_subscribe_lock");
1357 epoch_t latest_subscribed_epoch{0};
1358
1359 // -- heartbeat --
1360 /// information about a heartbeat peer
1361 struct HeartbeatInfo {
1362 int peer; ///< peer
1363 ConnectionRef con_front; ///< peer connection (front)
1364 ConnectionRef con_back; ///< peer connection (back)
1365 utime_t first_tx; ///< time we sent our first ping request
1366 utime_t last_tx; ///< last time we sent a ping request
1367 utime_t last_rx_front; ///< last time we got a ping reply on the front side
1368 utime_t last_rx_back; ///< last time we got a ping reply on the back side
1369 epoch_t epoch; ///< most recent epoch we wanted this peer
1370 /// number of connections we send and receive heartbeat pings/replies
1371 static constexpr int HEARTBEAT_MAX_CONN = 2;
1372 /// history of inflight pings, arranging by timestamp we sent
1373 /// send time -> deadline -> remaining replies
1374 map<utime_t, pair<utime_t, int>> ping_history;
1375
1376 utime_t hb_interval_start;
1377 uint32_t hb_average_count = 0;
1378 uint32_t hb_index = 0;
1379
1380 uint32_t hb_total_back = 0;
1381 uint32_t hb_min_back = UINT_MAX;
1382 uint32_t hb_max_back = 0;
1383 vector<uint32_t> hb_back_pingtime;
1384 vector<uint32_t> hb_back_min;
1385 vector<uint32_t> hb_back_max;
1386
1387 uint32_t hb_total_front = 0;
1388 uint32_t hb_min_front = UINT_MAX;
1389 uint32_t hb_max_front = 0;
1390 vector<uint32_t> hb_front_pingtime;
1391 vector<uint32_t> hb_front_min;
1392 vector<uint32_t> hb_front_max;
1393
1394 bool is_stale(utime_t stale) {
1395 if (ping_history.empty()) {
1396 return false;
1397 }
1398 utime_t oldest_deadline = ping_history.begin()->second.first;
1399 return oldest_deadline <= stale;
1400 }
1401
1402 bool is_unhealthy(utime_t now) {
1403 if (ping_history.empty()) {
1404 /// we haven't sent a ping yet or we have got all replies,
1405 /// in either way we are safe and healthy for now
1406 return false;
1407 }
1408
1409 utime_t oldest_deadline = ping_history.begin()->second.first;
1410 return now > oldest_deadline;
1411 }
1412
1413 bool is_healthy(utime_t now) {
1414 if (last_rx_front == utime_t() || last_rx_back == utime_t()) {
1415 // only declare to be healthy until we have received the first
1416 // replies from both front/back connections
1417 return false;
1418 }
1419 return !is_unhealthy(now);
1420 }
1421
1422 void clear_mark_down(Connection *except = nullptr) {
1423 if (con_back && con_back != except) {
1424 con_back->mark_down();
1425 con_back->clear_priv();
1426 con_back.reset(nullptr);
1427 }
1428 if (con_front && con_front != except) {
1429 con_front->mark_down();
1430 con_front->clear_priv();
1431 con_front.reset(nullptr);
1432 }
1433 }
1434 };
1435
1436 ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock");
1437 map<int, int> debug_heartbeat_drops_remaining;
1438 ceph::condition_variable heartbeat_cond;
1439 bool heartbeat_stop;
1440 std::atomic<bool> heartbeat_need_update;
1441 map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
1442 utime_t last_mon_heartbeat;
1443 Messenger *hb_front_client_messenger;
1444 Messenger *hb_back_client_messenger;
1445 Messenger *hb_front_server_messenger;
1446 Messenger *hb_back_server_messenger;
1447 utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
1448 double daily_loadavg;
1449 ceph::mono_time startup_time;
1450
1451 // Track ping repsonse times using vector as a circular buffer
1452 // MUST BE A POWER OF 2
1453 const uint32_t hb_vector_size = 16;
1454
1455 void _add_heartbeat_peer(int p);
1456 void _remove_heartbeat_peer(int p);
1457 bool heartbeat_reset(Connection *con);
1458 void maybe_update_heartbeat_peers();
1459 void reset_heartbeat_peers(bool all);
1460 bool heartbeat_peers_need_update() {
1461 return heartbeat_need_update.load();
1462 }
1463 void heartbeat_set_peers_need_update() {
1464 heartbeat_need_update.store(true);
1465 }
1466 void heartbeat_clear_peers_need_update() {
1467 heartbeat_need_update.store(false);
1468 }
1469 void heartbeat();
1470 void heartbeat_check();
1471 void heartbeat_entry();
1472 void need_heartbeat_peer_update();
1473
1474 void heartbeat_kick() {
1475 std::lock_guard l(heartbeat_lock);
1476 heartbeat_cond.notify_all();
1477 }
1478
1479 struct T_Heartbeat : public Thread {
1480 OSD *osd;
1481 explicit T_Heartbeat(OSD *o) : osd(o) {}
1482 void *entry() override {
1483 osd->heartbeat_entry();
1484 return 0;
1485 }
1486 } heartbeat_thread;
1487
1488 public:
1489 bool heartbeat_dispatch(Message *m);
1490
1491 struct HeartbeatDispatcher : public Dispatcher {
1492 OSD *osd;
1493 explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
1494
1495 bool ms_can_fast_dispatch_any() const override { return true; }
1496 bool ms_can_fast_dispatch(const Message *m) const override {
1497 switch (m->get_type()) {
1498 case CEPH_MSG_PING:
1499 case MSG_OSD_PING:
1500 return true;
1501 default:
1502 return false;
1503 }
1504 }
1505 void ms_fast_dispatch(Message *m) override {
1506 osd->heartbeat_dispatch(m);
1507 }
1508 bool ms_dispatch(Message *m) override {
1509 return osd->heartbeat_dispatch(m);
1510 }
1511 bool ms_handle_reset(Connection *con) override {
1512 return osd->heartbeat_reset(con);
1513 }
1514 void ms_handle_remote_reset(Connection *con) override {}
1515 bool ms_handle_refused(Connection *con) override {
1516 return osd->ms_handle_refused(con);
1517 }
1518 int ms_handle_authentication(Connection *con) override {
1519 return true;
1520 }
1521 } heartbeat_dispatcher;
1522
1523 private:
1524 // -- waiters --
1525 list<OpRequestRef> finished;
1526
1527 void take_waiters(list<OpRequestRef>& ls) {
1528 ceph_assert(ceph_mutex_is_locked(osd_lock));
1529 finished.splice(finished.end(), ls);
1530 }
1531 void do_waiters();
1532
1533 // -- op tracking --
1534 OpTracker op_tracker;
1535 void test_ops(std::string command, std::string args, ostream& ss);
1536 friend class TestOpsSocketHook;
1537 TestOpsSocketHook *test_ops_hook;
1538 friend struct C_FinishSplits;
1539 friend struct C_OpenPGs;
1540
1541 protected:
1542
1543 /*
1544 * The ordered op delivery chain is:
1545 *
1546 * fast dispatch -> scheduler back
1547 * scheduler front <-> to_process back
1548 * to_process front -> RunVis(item)
1549 * <- queue_front()
1550 *
1551 * The scheduler is per-shard, and to_process is per pg_slot. Items can be
1552 * pushed back up into to_process and/or scheduler while order is preserved.
1553 *
1554 * Multiple worker threads can operate on each shard.
1555 *
1556 * Under normal circumstances, num_running == to_process.size(). There are
1557 * two times when that is not true: (1) when waiting_for_pg == true and
1558 * to_process is accumulating requests that are waiting for the pg to be
1559 * instantiated; in that case they will all get requeued together by
1560 * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
1561 * and already requeued the items.
1562 */
1563 friend class ceph::osd::scheduler::PGOpItem;
1564 friend class ceph::osd::scheduler::PGPeeringItem;
1565 friend class ceph::osd::scheduler::PGRecovery;
1566 friend class ceph::osd::scheduler::PGDelete;
1567
1568 class ShardedOpWQ
1569 : public ShardedThreadPool::ShardedWQ<OpSchedulerItem>
1570 {
1571 OSD *osd;
1572
1573 public:
1574 ShardedOpWQ(OSD *o,
1575 time_t ti,
1576 time_t si,
1577 ShardedThreadPool* tp)
1578 : ShardedThreadPool::ShardedWQ<OpSchedulerItem>(ti, si, tp),
1579 osd(o) {
1580 }
1581
1582 void _add_slot_waiter(
1583 spg_t token,
1584 OSDShardPGSlot *slot,
1585 OpSchedulerItem&& qi);
1586
1587 /// try to do some work
1588 void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
1589
1590 /// enqueue a new item
1591 void _enqueue(OpSchedulerItem&& item) override;
1592
1593 /// requeue an old item (at the front of the line)
1594 void _enqueue_front(OpSchedulerItem&& item) override;
1595
1596 void return_waiting_threads() override {
1597 for(uint32_t i = 0; i < osd->num_shards; i++) {
1598 OSDShard* sdata = osd->shards[i];
1599 assert (NULL != sdata);
1600 std::scoped_lock l{sdata->sdata_wait_lock};
1601 sdata->stop_waiting = true;
1602 sdata->sdata_cond.notify_all();
1603 }
1604 }
1605
1606 void stop_return_waiting_threads() override {
1607 for(uint32_t i = 0; i < osd->num_shards; i++) {
1608 OSDShard* sdata = osd->shards[i];
1609 assert (NULL != sdata);
1610 std::scoped_lock l{sdata->sdata_wait_lock};
1611 sdata->stop_waiting = false;
1612 }
1613 }
1614
1615 void dump(Formatter *f) {
1616 for(uint32_t i = 0; i < osd->num_shards; i++) {
1617 auto &&sdata = osd->shards[i];
1618
1619 char queue_name[32] = {0};
1620 snprintf(queue_name, sizeof(queue_name), "%s%" PRIu32, "OSD:ShardedOpWQ:", i);
1621 ceph_assert(NULL != sdata);
1622
1623 std::scoped_lock l{sdata->shard_lock};
1624 f->open_object_section(queue_name);
1625 sdata->scheduler->dump(*f);
1626 f->close_section();
1627 }
1628 }
1629
1630 bool is_shard_empty(uint32_t thread_index) override {
1631 uint32_t shard_index = thread_index % osd->num_shards;
1632 auto &&sdata = osd->shards[shard_index];
1633 ceph_assert(sdata);
1634 std::lock_guard l(sdata->shard_lock);
1635 if (thread_index < osd->num_shards) {
1636 return sdata->scheduler->empty() && sdata->context_queue.empty();
1637 } else {
1638 return sdata->scheduler->empty();
1639 }
1640 }
1641
1642 void handle_oncommits(list<Context*>& oncommits) {
1643 for (auto p : oncommits) {
1644 p->complete(0);
1645 }
1646 }
1647 } op_shardedwq;
1648
1649
1650 void enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch);
1651 void dequeue_op(
1652 PGRef pg, OpRequestRef op,
1653 ThreadPool::TPHandle &handle);
1654
1655 void enqueue_peering_evt(
1656 spg_t pgid,
1657 PGPeeringEventRef ref);
1658 void dequeue_peering_evt(
1659 OSDShard *sdata,
1660 PG *pg,
1661 PGPeeringEventRef ref,
1662 ThreadPool::TPHandle& handle);
1663
1664 void dequeue_delete(
1665 OSDShard *sdata,
1666 PG *pg,
1667 epoch_t epoch,
1668 ThreadPool::TPHandle& handle);
1669
1670 friend class PG;
1671 friend class OSDShard;
1672 friend class PrimaryLogPG;
1673
1674
1675 protected:
1676
1677 // -- osd map --
1678 // TODO: switch to std::atomic<OSDMapRef> when C++20 will be available.
1679 OSDMapRef _osdmap;
1680 void set_osdmap(OSDMapRef osdmap) {
1681 std::atomic_store(&_osdmap, osdmap);
1682 }
1683 OSDMapRef get_osdmap() const {
1684 return std::atomic_load(&_osdmap);
1685 }
1686 epoch_t get_osdmap_epoch() const {
1687 // XXX: performance?
1688 auto osdmap = get_osdmap();
1689 return osdmap ? osdmap->get_epoch() : 0;
1690 }
1691
1692 pool_pg_num_history_t pg_num_history;
1693
1694 ceph::shared_mutex map_lock = ceph::make_shared_mutex("OSD::map_lock");
1695 list<OpRequestRef> waiting_for_osdmap;
1696 deque<utime_t> osd_markdown_log;
1697
1698 friend struct send_map_on_destruct;
1699
1700 void wait_for_new_map(OpRequestRef op);
1701 void handle_osd_map(class MOSDMap *m);
1702 void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
1703 void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
1704 void note_down_osd(int osd);
1705 void note_up_osd(int osd);
1706 friend class C_OnMapCommit;
1707
1708 bool advance_pg(
1709 epoch_t advance_to,
1710 PG *pg,
1711 ThreadPool::TPHandle &handle,
1712 PeeringCtx &rctx);
1713 void consume_map();
1714 void activate_map();
1715
1716 // osd map cache (past osd maps)
1717 OSDMapRef get_map(epoch_t e) {
1718 return service.get_map(e);
1719 }
1720 OSDMapRef add_map(OSDMap *o) {
1721 return service.add_map(o);
1722 }
1723 bool get_map_bl(epoch_t e, bufferlist& bl) {
1724 return service.get_map_bl(e, bl);
1725 }
1726
1727 public:
1728 // -- shards --
1729 vector<OSDShard*> shards;
1730 uint32_t num_shards = 0;
1731
1732 void inc_num_pgs() {
1733 ++num_pgs;
1734 }
1735 void dec_num_pgs() {
1736 --num_pgs;
1737 }
1738 int get_num_pgs() const {
1739 return num_pgs;
1740 }
1741
1742 protected:
1743 ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
1744 /// merge epoch -> target pgid -> source pgid -> pg
1745 map<epoch_t,map<spg_t,map<spg_t,PGRef>>> merge_waiters;
1746
1747 bool add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef source,
1748 unsigned need);
1749
1750 // -- placement groups --
1751 std::atomic<size_t> num_pgs = {0};
1752
1753 std::mutex pending_creates_lock;
1754 using create_from_osd_t = std::pair<spg_t, bool /* is primary*/>;
1755 std::set<create_from_osd_t> pending_creates_from_osd;
1756 unsigned pending_creates_from_mon = 0;
1757
1758 PGRecoveryStats pg_recovery_stats;
1759
1760 PGRef _lookup_pg(spg_t pgid);
1761 PGRef _lookup_lock_pg(spg_t pgid);
1762 void register_pg(PGRef pg);
1763 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
1764
1765 void _get_pgs(vector<PGRef> *v, bool clear_too=false);
1766 void _get_pgids(vector<spg_t> *v);
1767
1768 public:
1769 PGRef lookup_lock_pg(spg_t pgid);
1770
1771 std::set<int64_t> get_mapped_pools();
1772
1773 protected:
1774 PG* _make_pg(OSDMapRef createmap, spg_t pgid);
1775
1776 bool maybe_wait_for_max_pg(const OSDMapRef& osdmap,
1777 spg_t pgid, bool is_mon_create);
1778 void resume_creating_pg();
1779
1780 void load_pgs();
1781
1782 /// build initial pg history and intervals on create
1783 void build_initial_pg_history(
1784 spg_t pgid,
1785 epoch_t created,
1786 utime_t created_stamp,
1787 pg_history_t *h,
1788 PastIntervals *pi);
1789
1790 epoch_t last_pg_create_epoch;
1791
1792 void handle_pg_create(OpRequestRef op);
1793
1794 void split_pgs(
1795 PG *parent,
1796 const set<spg_t> &childpgids, set<PGRef> *out_pgs,
1797 OSDMapRef curmap,
1798 OSDMapRef nextmap,
1799 PeeringCtx &rctx);
1800 void _finish_splits(set<PGRef>& pgs);
1801
1802 // == monitor interaction ==
1803 ceph::mutex mon_report_lock = ceph::make_mutex("OSD::mon_report_lock");
1804 utime_t last_mon_report;
1805 Finisher boot_finisher;
1806
1807 // -- boot --
1808 void start_boot();
1809 void _got_mon_epochs(epoch_t oldest, epoch_t newest);
1810 void _preboot(epoch_t oldest, epoch_t newest);
1811 void _send_boot();
1812 void _collect_metadata(map<string,string> *pmeta);
1813 void _get_purged_snaps();
1814 void handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *r);
1815
1816 void start_waiting_for_healthy();
1817 bool _is_healthy();
1818
1819 void send_full_update();
1820
1821 friend struct C_OSD_GetVersion;
1822
1823 // -- alive --
1824 epoch_t up_thru_wanted;
1825
1826 void queue_want_up_thru(epoch_t want);
1827 void send_alive();
1828
1829 // -- full map requests --
1830 epoch_t requested_full_first, requested_full_last;
1831
1832 void request_full_map(epoch_t first, epoch_t last);
1833 void rerequest_full_maps() {
1834 epoch_t first = requested_full_first;
1835 epoch_t last = requested_full_last;
1836 requested_full_first = 0;
1837 requested_full_last = 0;
1838 request_full_map(first, last);
1839 }
1840 void got_full_map(epoch_t e);
1841
1842 // -- failures --
1843 map<int,utime_t> failure_queue;
1844 map<int,pair<utime_t,entity_addrvec_t> > failure_pending;
1845
1846 void requeue_failures();
1847 void send_failures();
1848 void send_still_alive(epoch_t epoch, int osd, const entity_addrvec_t &addrs);
1849 void cancel_pending_failures();
1850
1851 ceph::coarse_mono_clock::time_point last_sent_beacon;
1852 ceph::mutex min_last_epoch_clean_lock = ceph::make_mutex("OSD::min_last_epoch_clean_lock");
1853 epoch_t min_last_epoch_clean = 0;
1854 // which pgs were scanned for min_lec
1855 std::vector<pg_t> min_last_epoch_clean_pgs;
1856 void send_beacon(const ceph::coarse_mono_clock::time_point& now);
1857
1858 ceph_tid_t get_tid() {
1859 return service.get_tid();
1860 }
1861
1862 double scrub_sleep_time(bool must_scrub);
1863
1864 // -- generic pg peering --
1865 PeeringCtx create_context();
1866 void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
1867 ThreadPool::TPHandle *handle = NULL);
1868
1869 bool require_mon_peer(const Message *m);
1870 bool require_mon_or_mgr_peer(const Message *m);
1871 bool require_osd_peer(const Message *m);
1872 /***
1873 * Verifies that we were alive in the given epoch, and that
1874 * still are.
1875 */
1876 bool require_self_aliveness(const Message *m, epoch_t alive_since);
1877 /**
1878 * Verifies that the OSD who sent the given op has the same
1879 * address as in the given map.
1880 * @pre op was sent by an OSD using the cluster messenger
1881 */
1882 bool require_same_peer_instance(const Message *m, const OSDMapRef& map,
1883 bool is_fast_dispatch);
1884
1885 bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
1886 bool is_fast_dispatch);
1887
1888 void handle_fast_pg_create(MOSDPGCreate2 *m);
1889 void handle_fast_pg_query(MOSDPGQuery *m);
1890 void handle_pg_query_nopg(const MQuery& q);
1891 void handle_fast_pg_notify(MOSDPGNotify *m);
1892 void handle_pg_notify_nopg(const MNotifyRec& q);
1893 void handle_fast_pg_info(MOSDPGInfo *m);
1894 void handle_fast_pg_remove(MOSDPGRemove *m);
1895
1896 public:
1897 // used by OSDShard
1898 PGRef handle_pg_create_info(const OSDMapRef& osdmap, const PGCreateInfo *info);
1899 protected:
1900
1901 void handle_fast_force_recovery(MOSDForceRecovery *m);
1902
1903 // -- commands --
1904 void handle_command(class MCommand *m);
1905
1906
1907 // -- pg recovery --
1908 void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
1909 ThreadPool::TPHandle &handle);
1910
1911
1912 // -- scrubbing --
1913 void sched_scrub();
1914 void resched_all_scrubs();
1915 bool scrub_random_backoff();
1916 bool scrub_load_below_threshold();
1917 bool scrub_time_permit(utime_t now);
1918
1919 // -- status reporting --
1920 MPGStats *collect_pg_stats();
1921 std::vector<DaemonHealthMetric> get_health_metrics();
1922
1923
1924 private:
1925 bool ms_can_fast_dispatch_any() const override { return true; }
1926 bool ms_can_fast_dispatch(const Message *m) const override {
1927 switch (m->get_type()) {
1928 case CEPH_MSG_PING:
1929 case CEPH_MSG_OSD_OP:
1930 case CEPH_MSG_OSD_BACKOFF:
1931 case MSG_OSD_SCRUB2:
1932 case MSG_OSD_FORCE_RECOVERY:
1933 case MSG_MON_COMMAND:
1934 case MSG_OSD_PG_CREATE2:
1935 case MSG_OSD_PG_QUERY:
1936 case MSG_OSD_PG_QUERY2:
1937 case MSG_OSD_PG_INFO:
1938 case MSG_OSD_PG_INFO2:
1939 case MSG_OSD_PG_NOTIFY:
1940 case MSG_OSD_PG_NOTIFY2:
1941 case MSG_OSD_PG_LOG:
1942 case MSG_OSD_PG_TRIM:
1943 case MSG_OSD_PG_REMOVE:
1944 case MSG_OSD_BACKFILL_RESERVE:
1945 case MSG_OSD_RECOVERY_RESERVE:
1946 case MSG_OSD_REPOP:
1947 case MSG_OSD_REPOPREPLY:
1948 case MSG_OSD_PG_PUSH:
1949 case MSG_OSD_PG_PULL:
1950 case MSG_OSD_PG_PUSH_REPLY:
1951 case MSG_OSD_PG_SCAN:
1952 case MSG_OSD_PG_BACKFILL:
1953 case MSG_OSD_PG_BACKFILL_REMOVE:
1954 case MSG_OSD_EC_WRITE:
1955 case MSG_OSD_EC_WRITE_REPLY:
1956 case MSG_OSD_EC_READ:
1957 case MSG_OSD_EC_READ_REPLY:
1958 case MSG_OSD_SCRUB_RESERVE:
1959 case MSG_OSD_REP_SCRUB:
1960 case MSG_OSD_REP_SCRUBMAP:
1961 case MSG_OSD_PG_UPDATE_LOG_MISSING:
1962 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
1963 case MSG_OSD_PG_RECOVERY_DELETE:
1964 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
1965 case MSG_OSD_PG_LEASE:
1966 case MSG_OSD_PG_LEASE_ACK:
1967 return true;
1968 default:
1969 return false;
1970 }
1971 }
1972 void ms_fast_dispatch(Message *m) override;
1973 bool ms_dispatch(Message *m) override;
1974 void ms_handle_connect(Connection *con) override;
1975 void ms_handle_fast_connect(Connection *con) override;
1976 void ms_handle_fast_accept(Connection *con) override;
1977 int ms_handle_authentication(Connection *con) override;
1978 bool ms_handle_reset(Connection *con) override;
1979 void ms_handle_remote_reset(Connection *con) override {}
1980 bool ms_handle_refused(Connection *con) override;
1981
1982 public:
1983 /* internal and external can point to the same messenger, they will still
1984 * be cleaned up properly*/
1985 OSD(CephContext *cct_,
1986 ObjectStore *store_,
1987 int id,
1988 Messenger *internal,
1989 Messenger *external,
1990 Messenger *hb_front_client,
1991 Messenger *hb_back_client,
1992 Messenger *hb_front_server,
1993 Messenger *hb_back_server,
1994 Messenger *osdc_messenger,
1995 MonClient *mc, const std::string &dev, const std::string &jdev);
1996 ~OSD() override;
1997
1998 // static bits
1999 static int mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, std::string osdspec_affinity);
2000
2001 /* remove any non-user xattrs from a map of them */
2002 void filter_xattrs(map<string, bufferptr>& attrs) {
2003 for (map<string, bufferptr>::iterator iter = attrs.begin();
2004 iter != attrs.end();
2005 ) {
2006 if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
2007 attrs.erase(iter++);
2008 else ++iter;
2009 }
2010 }
2011
2012 private:
2013 int mon_cmd_maybe_osd_create(string &cmd);
2014 int update_crush_device_class();
2015 int update_crush_location();
2016
2017 static int write_meta(CephContext *cct,
2018 ObjectStore *store,
2019 uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami, std::string& osdspec_affinity);
2020
2021 void handle_scrub(struct MOSDScrub *m);
2022 void handle_fast_scrub(struct MOSDScrub2 *m);
2023 void handle_osd_ping(class MOSDPing *m);
2024
2025 size_t get_num_cache_shards();
2026 int get_num_op_shards();
2027 int get_num_op_threads();
2028
2029 float get_osd_recovery_sleep();
2030 float get_osd_delete_sleep();
2031 float get_osd_snap_trim_sleep();
2032
2033 int get_recovery_max_active();
2034
2035 void scrub_purged_snaps();
2036 void probe_smart(const string& devid, ostream& ss);
2037
2038 public:
2039 static int peek_meta(ObjectStore *store,
2040 string *magic,
2041 uuid_d *cluster_fsid,
2042 uuid_d *osd_fsid,
2043 int *whoami,
2044 ceph_release_t *min_osd_release);
2045
2046
2047 // startup/shutdown
2048 int pre_init();
2049 int init();
2050 void final_init();
2051
2052 int enable_disable_fuse(bool stop);
2053 int set_numa_affinity();
2054
2055 void suicide(int exitcode);
2056 int shutdown();
2057
2058 void handle_signal(int signum);
2059
2060 /// check if we can throw out op from a disconnected client
2061 static bool op_is_discardable(const MOSDOp *m);
2062
2063 public:
2064 OSDService service;
2065 friend class OSDService;
2066
2067 private:
2068 void set_perf_queries(const ConfigPayload &config_payload);
2069 MetricPayload get_perf_reports();
2070
2071 ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock");
2072 std::list<OSDPerfMetricQuery> m_perf_queries;
2073 std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> m_perf_limits;
2074 };
2075
2076
2077 //compatibility of the executable
2078 extern const CompatSet::Feature ceph_osd_feature_compat[];
2079 extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
2080 extern const CompatSet::Feature ceph_osd_feature_incompat[];
2081
2082 #endif // CEPH_OSD_H