]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/OSD.h
2da5de10aa69bb426dff0287109d404b75c3fcde
[ceph.git] / ceph / src / osd / OSD.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_H
16 #define CEPH_OSD_H
17
18 #include "PG.h"
19
20 #include "msg/Dispatcher.h"
21
22 #include "common/async/context_pool.h"
23 #include "common/Timer.h"
24 #include "common/WorkQueue.h"
25 #include "common/AsyncReserver.h"
26 #include "common/ceph_context.h"
27 #include "common/config_cacher.h"
28 #include "common/zipkin_trace.h"
29 #include "common/ceph_timer.h"
30
31 #include "mgr/MgrClient.h"
32
33 #include "os/ObjectStore.h"
34
35 #include "include/CompatSet.h"
36 #include "include/common_fwd.h"
37
38 #include "OpRequest.h"
39 #include "Session.h"
40
41 #include "osd/scheduler/OpScheduler.h"
42
43 #include <atomic>
44 #include <map>
45 #include <memory>
46 #include <string>
47
48 #include "include/unordered_map.h"
49
50 #include "common/shared_cache.hpp"
51 #include "common/simple_cache.hpp"
52 #include "messages/MOSDOp.h"
53 #include "common/EventTrace.h"
54 #include "osd/osd_perf_counters.h"
55 #include "common/Finisher.h"
56 #include "scrubber/osd_scrub_sched.h"
57
58 #define CEPH_OSD_PROTOCOL 10 /* cluster internal */
59
60 /*
61
62 lock ordering for pg map
63
64 PG::lock
65 ShardData::lock
66 OSD::pg_map_lock
67
68 */
69
70 class Messenger;
71 class Message;
72 class MonClient;
73 class ObjectStore;
74 class FuseStore;
75 class OSDMap;
76 class MLog;
77 class Objecter;
78 class KeyStore;
79
80 class Watch;
81 class PrimaryLogPG;
82
83 class TestOpsSocketHook;
84 struct C_FinishSplits;
85 struct C_OpenPGs;
86 class LogChannel;
87
88 class MOSDPGCreate2;
89 class MOSDPGNotify;
90 class MOSDPGInfo;
91 class MOSDPGRemove;
92 class MOSDForceRecovery;
93 class MMonGetPurgedSnapsReply;
94
95 class OSD;
96
97 class OSDService {
98 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
99 public:
100 OSD *osd;
101 CephContext *cct;
102 ObjectStore::CollectionHandle meta_ch;
103 const int whoami;
104 ObjectStore * const store;
105 LogClient &log_client;
106 LogChannelRef clog;
107 PGRecoveryStats &pg_recovery_stats;
108 private:
109 Messenger *&cluster_messenger;
110 Messenger *&client_messenger;
111 public:
112 PerfCounters *&logger;
113 PerfCounters *&recoverystate_perf;
114 MonClient *&monc;
115
116 md_config_cacher_t<Option::size_t> osd_max_object_size;
117 md_config_cacher_t<bool> osd_skip_data_digest;
118
119 void enqueue_back(OpSchedulerItem&& qi);
120 void enqueue_front(OpSchedulerItem&& qi);
121
122 void maybe_inject_dispatch_delay() {
123 if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) {
124 if (rand() % 10000 <
125 g_conf()->osd_debug_inject_dispatch_delay_probability * 10000) {
126 utime_t t;
127 t.set_from_double(g_conf()->osd_debug_inject_dispatch_delay_duration);
128 t.sleep();
129 }
130 }
131 }
132
133 ceph::signedspan get_mnow();
134
135 private:
136 // -- superblock --
137 ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
138 OSDSuperblock superblock;
139
140 public:
141 OSDSuperblock get_superblock() {
142 std::lock_guard l(publish_lock);
143 return superblock;
144 }
145 void publish_superblock(const OSDSuperblock &block) {
146 std::lock_guard l(publish_lock);
147 superblock = block;
148 }
149
150 int get_nodeid() const { return whoami; }
151
152 std::atomic<epoch_t> max_oldest_map;
153 private:
154 OSDMapRef osdmap;
155
156 public:
157 OSDMapRef get_osdmap() {
158 std::lock_guard l(publish_lock);
159 return osdmap;
160 }
161 epoch_t get_osdmap_epoch() {
162 std::lock_guard l(publish_lock);
163 return osdmap ? osdmap->get_epoch() : 0;
164 }
165 void publish_map(OSDMapRef map) {
166 std::lock_guard l(publish_lock);
167 osdmap = map;
168 }
169
170 /*
171 * osdmap - current published std::map
172 * next_osdmap - pre_published std::map that is about to be published.
173 *
174 * We use the next_osdmap to send messages and initiate connections,
175 * but only if the target is the same instance as the one in the std::map
176 * epoch the current user is working from (i.e., the result is
177 * equivalent to what is in next_osdmap).
178 *
179 * This allows the helpers to start ignoring osds that are about to
180 * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
181 * down, without worrying about reopening connections from threads
182 * working from old maps.
183 */
184 private:
185 OSDMapRef next_osdmap;
186 ceph::condition_variable pre_publish_cond;
187 int pre_publish_waiter = 0;
188
189 public:
190 void pre_publish_map(OSDMapRef map) {
191 std::lock_guard l(pre_publish_lock);
192 next_osdmap = std::move(map);
193 }
194
195 void activate_map();
196 /// map epochs reserved below
197 std::map<epoch_t, unsigned> map_reservations;
198
199 /// gets ref to next_osdmap and registers the epoch as reserved
200 OSDMapRef get_nextmap_reserved() {
201 std::lock_guard l(pre_publish_lock);
202 epoch_t e = next_osdmap->get_epoch();
203 std::map<epoch_t, unsigned>::iterator i =
204 map_reservations.insert(std::make_pair(e, 0)).first;
205 i->second++;
206 return next_osdmap;
207 }
208 /// releases reservation on map
209 void release_map(OSDMapRef osdmap) {
210 std::lock_guard l(pre_publish_lock);
211 std::map<epoch_t, unsigned>::iterator i =
212 map_reservations.find(osdmap->get_epoch());
213 ceph_assert(i != map_reservations.end());
214 ceph_assert(i->second > 0);
215 if (--(i->second) == 0) {
216 map_reservations.erase(i);
217 }
218 if (pre_publish_waiter) {
219 pre_publish_cond.notify_all();
220 }
221 }
222 /// blocks until there are no reserved maps prior to next_osdmap
223 void await_reserved_maps() {
224 std::unique_lock l{pre_publish_lock};
225 ceph_assert(next_osdmap);
226 pre_publish_waiter++;
227 pre_publish_cond.wait(l, [this] {
228 auto i = map_reservations.cbegin();
229 return (i == map_reservations.cend() ||
230 i->first >= next_osdmap->get_epoch());
231 });
232 pre_publish_waiter--;
233 }
234 OSDMapRef get_next_osdmap() {
235 std::lock_guard l(pre_publish_lock);
236 return next_osdmap;
237 }
238
239 void maybe_share_map(Connection *con,
240 const OSDMapRef& osdmap,
241 epoch_t peer_epoch_lb=0);
242
243 void send_map(class MOSDMap *m, Connection *con);
244 void send_incremental_map(epoch_t since, Connection *con,
245 const OSDMapRef& osdmap);
246 MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
247 OSDSuperblock& superblock);
248
249 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
250 std::pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
251 void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
252 void send_message_osd_cluster(std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch);
253 void send_message_osd_cluster(MessageRef m, Connection *con) {
254 con->send_message2(std::move(m));
255 }
256 void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
257 con->send_message(m);
258 }
259 void send_message_osd_client(Message *m, const ConnectionRef& con) {
260 con->send_message(m);
261 }
262 entity_name_t get_cluster_msgr_name() const;
263
264
265 public:
266
267 void reply_op_error(OpRequestRef op, int err);
268 void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv,
269 std::vector<pg_log_op_return_item_t> op_returns);
270 void handle_misdirected_op(PG *pg, OpRequestRef op);
271
272 private:
273 /**
274 * The entity that maintains the set of PGs we may scrub (i.e. - those that we
275 * are their primary), and schedules their scrubbing.
276 */
277 ScrubQueue m_scrub_queue;
278
279 public:
280 ScrubQueue& get_scrub_services() { return m_scrub_queue; }
281
282 /**
283 * A callback used by the ScrubQueue object to initiate a scrub on a specific PG.
284 *
285 * The request might fail for multiple reasons, as ScrubQueue cannot by its own
286 * check some of the PG-specific preconditions and those are checked here. See
287 * attempt_t definition.
288 *
289 * @param pgid to scrub
290 * @param allow_requested_repair_only
291 * @return a Scrub::attempt_t detailing either a success, or the failure reason.
292 */
293 Scrub::schedule_result_t initiate_a_scrub(spg_t pgid, bool allow_requested_repair_only);
294
295
296 private:
297 // -- agent shared state --
298 ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock");
299 ceph::condition_variable agent_cond;
300 std::map<uint64_t, std::set<PGRef> > agent_queue;
301 std::set<PGRef>::iterator agent_queue_pos;
302 bool agent_valid_iterator;
303 int agent_ops;
304 int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
305 std::set<hobject_t> agent_oids;
306 bool agent_active;
307 struct AgentThread : public Thread {
308 OSDService *osd;
309 explicit AgentThread(OSDService *o) : osd(o) {}
310 void *entry() override {
311 osd->agent_entry();
312 return NULL;
313 }
314 } agent_thread;
315 bool agent_stop_flag;
316 ceph::mutex agent_timer_lock = ceph::make_mutex("OSDService::agent_timer_lock");
317 SafeTimer agent_timer;
318
319 public:
320 void agent_entry();
321 void agent_stop();
322
323 void _enqueue(PG *pg, uint64_t priority) {
324 if (!agent_queue.empty() &&
325 agent_queue.rbegin()->first < priority)
326 agent_valid_iterator = false; // inserting higher-priority queue
327 std::set<PGRef>& nq = agent_queue[priority];
328 if (nq.empty())
329 agent_cond.notify_all();
330 nq.insert(pg);
331 }
332
333 void _dequeue(PG *pg, uint64_t old_priority) {
334 std::set<PGRef>& oq = agent_queue[old_priority];
335 std::set<PGRef>::iterator p = oq.find(pg);
336 ceph_assert(p != oq.end());
337 if (p == agent_queue_pos)
338 ++agent_queue_pos;
339 oq.erase(p);
340 if (oq.empty()) {
341 if (agent_queue.rbegin()->first == old_priority)
342 agent_valid_iterator = false;
343 agent_queue.erase(old_priority);
344 }
345 }
346
347 /// enable agent for a pg
348 void agent_enable_pg(PG *pg, uint64_t priority) {
349 std::lock_guard l(agent_lock);
350 _enqueue(pg, priority);
351 }
352
353 /// adjust priority for an enagled pg
354 void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
355 std::lock_guard l(agent_lock);
356 ceph_assert(new_priority != old_priority);
357 _enqueue(pg, new_priority);
358 _dequeue(pg, old_priority);
359 }
360
361 /// disable agent for a pg
362 void agent_disable_pg(PG *pg, uint64_t old_priority) {
363 std::lock_guard l(agent_lock);
364 _dequeue(pg, old_priority);
365 }
366
367 /// note start of an async (evict) op
368 void agent_start_evict_op() {
369 std::lock_guard l(agent_lock);
370 ++agent_ops;
371 }
372
373 /// note finish or cancellation of an async (evict) op
374 void agent_finish_evict_op() {
375 std::lock_guard l(agent_lock);
376 ceph_assert(agent_ops > 0);
377 --agent_ops;
378 agent_cond.notify_all();
379 }
380
381 /// note start of an async (flush) op
382 void agent_start_op(const hobject_t& oid) {
383 std::lock_guard l(agent_lock);
384 ++agent_ops;
385 ceph_assert(agent_oids.count(oid) == 0);
386 agent_oids.insert(oid);
387 }
388
389 /// note finish or cancellation of an async (flush) op
390 void agent_finish_op(const hobject_t& oid) {
391 std::lock_guard l(agent_lock);
392 ceph_assert(agent_ops > 0);
393 --agent_ops;
394 ceph_assert(agent_oids.count(oid) == 1);
395 agent_oids.erase(oid);
396 agent_cond.notify_all();
397 }
398
399 /// check if we are operating on an object
400 bool agent_is_active_oid(const hobject_t& oid) {
401 std::lock_guard l(agent_lock);
402 return agent_oids.count(oid);
403 }
404
405 /// get count of active agent ops
406 int agent_get_num_ops() {
407 std::lock_guard l(agent_lock);
408 return agent_ops;
409 }
410
411 void agent_inc_high_count() {
412 std::lock_guard l(agent_lock);
413 flush_mode_high_count ++;
414 }
415
416 void agent_dec_high_count() {
417 std::lock_guard l(agent_lock);
418 flush_mode_high_count --;
419 }
420
421 private:
422 /// throttle promotion attempts
423 std::atomic<unsigned int> promote_probability_millis{1000}; ///< probability thousands. one word.
424 PromoteCounter promote_counter;
425 utime_t last_recalibrate;
426 unsigned long promote_max_objects, promote_max_bytes;
427
428 public:
429 bool promote_throttle() {
430 // NOTE: lockless! we rely on the probability being a single word.
431 promote_counter.attempt();
432 if ((unsigned)rand() % 1000 > promote_probability_millis)
433 return true; // yes throttle (no promote)
434 if (promote_max_objects &&
435 promote_counter.objects > promote_max_objects)
436 return true; // yes throttle
437 if (promote_max_bytes &&
438 promote_counter.bytes > promote_max_bytes)
439 return true; // yes throttle
440 return false; // no throttle (promote)
441 }
442 void promote_finish(uint64_t bytes) {
443 promote_counter.finish(bytes);
444 }
445 void promote_throttle_recalibrate();
446 unsigned get_num_shards() const {
447 return m_objecter_finishers;
448 }
449 Finisher* get_objecter_finisher(int shard) {
450 return objecter_finishers[shard].get();
451 }
452
453 // -- Objecter, for tiering reads/writes from/to other OSDs --
454 ceph::async::io_context_pool& poolctx;
455 std::unique_ptr<Objecter> objecter;
456 int m_objecter_finishers;
457 std::vector<std::unique_ptr<Finisher>> objecter_finishers;
458
459 // -- Watch --
460 ceph::mutex watch_lock = ceph::make_mutex("OSDService::watch_lock");
461 SafeTimer watch_timer;
462 uint64_t next_notif_id;
463 uint64_t get_next_id(epoch_t cur_epoch) {
464 std::lock_guard l(watch_lock);
465 return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
466 }
467
468 // -- Recovery/Backfill Request Scheduling --
469 ceph::mutex recovery_request_lock = ceph::make_mutex("OSDService::recovery_request_lock");
470 SafeTimer recovery_request_timer;
471
472 // For async recovery sleep
473 bool recovery_needs_sleep = true;
474 ceph::real_clock::time_point recovery_schedule_time;
475
476 // For recovery & scrub & snap
477 ceph::mutex sleep_lock = ceph::make_mutex("OSDService::sleep_lock");
478 SafeTimer sleep_timer;
479
480 // -- tids --
481 // for ops i issue
482 std::atomic<unsigned int> last_tid{0};
483 ceph_tid_t get_tid() {
484 return (ceph_tid_t)last_tid++;
485 }
486
487 // -- backfill_reservation --
488 Finisher reserver_finisher;
489 AsyncReserver<spg_t, Finisher> local_reserver;
490 AsyncReserver<spg_t, Finisher> remote_reserver;
491
492 // -- pg merge --
493 ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
494 std::map<pg_t,eversion_t> ready_to_merge_source; // pg -> version
495 std::map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target; // pg -> (version,les,lec)
496 std::set<pg_t> not_ready_to_merge_source;
497 std::map<pg_t,pg_t> not_ready_to_merge_target;
498 std::set<pg_t> sent_ready_to_merge_source;
499
500 void set_ready_to_merge_source(PG *pg,
501 eversion_t version);
502 void set_ready_to_merge_target(PG *pg,
503 eversion_t version,
504 epoch_t last_epoch_started,
505 epoch_t last_epoch_clean);
506 void set_not_ready_to_merge_source(pg_t source);
507 void set_not_ready_to_merge_target(pg_t target, pg_t source);
508 void clear_ready_to_merge(PG *pg);
509 void send_ready_to_merge();
510 void _send_ready_to_merge();
511 void clear_sent_ready_to_merge();
512 void prune_sent_ready_to_merge(const OSDMapRef& osdmap);
513
514 // -- pg_temp --
515 private:
516 ceph::mutex pg_temp_lock = ceph::make_mutex("OSDService::pg_temp_lock");
517 struct pg_temp_t {
518 std::vector<int> acting;
519 bool forced = false;
520 };
521 std::map<pg_t, pg_temp_t> pg_temp_wanted;
522 std::map<pg_t, pg_temp_t> pg_temp_pending;
523 void _sent_pg_temp();
524 friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
525 public:
526 void queue_want_pg_temp(pg_t pgid, const std::vector<int>& want,
527 bool forced = false);
528 void remove_want_pg_temp(pg_t pgid);
529 void requeue_pg_temp();
530 void send_pg_temp();
531
532 ceph::mutex pg_created_lock = ceph::make_mutex("OSDService::pg_created_lock");
533 std::set<pg_t> pg_created;
534 void send_pg_created(pg_t pgid);
535 void prune_pg_created();
536 void send_pg_created();
537
538 AsyncReserver<spg_t, Finisher> snap_reserver;
539 void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c);
540 void queue_for_snap_trim(PG *pg);
541 void queue_for_scrub(PG* pg, Scrub::scrub_prio_t with_priority);
542
543 void queue_scrub_after_repair(PG* pg, Scrub::scrub_prio_t with_priority);
544
545 /// queue the message (-> event) that all replicas have reserved scrub resources for us
546 void queue_for_scrub_granted(PG* pg, Scrub::scrub_prio_t with_priority);
547
548 /// queue the message (-> event) that some replicas denied our scrub resources request
549 void queue_for_scrub_denied(PG* pg, Scrub::scrub_prio_t with_priority);
550
551 /// Signals either (a) the end of a sleep period, or (b) a recheck of the availability
552 /// of the primary map being created by the backend.
553 void queue_for_scrub_resched(PG* pg, Scrub::scrub_prio_t with_priority);
554
555 /// Signals a change in the number of in-flight recovery writes
556 void queue_scrub_pushes_update(PG* pg, Scrub::scrub_prio_t with_priority);
557
558 /// Signals that all pending updates were applied
559 void queue_scrub_applied_update(PG* pg, Scrub::scrub_prio_t with_priority);
560
561 /// Signals that the selected chunk (objects range) is available for scrubbing
562 void queue_scrub_chunk_free(PG* pg, Scrub::scrub_prio_t with_priority);
563
564 /// The chunk selected is blocked by user operations, and cannot be scrubbed now
565 void queue_scrub_chunk_busy(PG* pg, Scrub::scrub_prio_t with_priority);
566
567 /// The block-range that was locked and prevented the scrubbing - is freed
568 void queue_scrub_unblocking(PG* pg, Scrub::scrub_prio_t with_priority);
569
570 /// Signals that all write OPs are done
571 void queue_scrub_digest_update(PG* pg, Scrub::scrub_prio_t with_priority);
572
573 /// Signals that the the local (Primary's) scrub map is ready
574 void queue_scrub_got_local_map(PG* pg, Scrub::scrub_prio_t with_priority);
575
576 /// Signals that we (the Primary) got all waited-for scrub-maps from our replicas
577 void queue_scrub_got_repl_maps(PG* pg, Scrub::scrub_prio_t with_priority);
578
579 /// Signals that all chunks were handled
580 /// Note: always with high priority, as must be acted upon before the
581 /// next scrub request arrives from the Primary (and the primary is free
582 /// to send the request once the replica's map is received).
583 void queue_scrub_is_finished(PG* pg);
584
585 /// Signals that there are more chunks to handle
586 void queue_scrub_next_chunk(PG* pg, Scrub::scrub_prio_t with_priority);
587
588 /// Signals that we have finished comparing the maps for this chunk
589 /// Note: required, as in Crimson this operation is 'futurized'.
590 void queue_scrub_maps_compared(PG* pg, Scrub::scrub_prio_t with_priority);
591
592 void queue_for_rep_scrub(PG* pg,
593 Scrub::scrub_prio_t with_high_priority,
594 unsigned int qu_priority,
595 Scrub::act_token_t act_token);
596
597 /// Signals a change in the number of in-flight recovery writes
598 void queue_scrub_replica_pushes(PG *pg, Scrub::scrub_prio_t with_priority);
599
600 /// (not in Crimson) Queue a SchedReplica event to be sent to the replica, to
601 /// trigger a re-check of the availability of the scrub map prepared by the
602 /// backend.
603 void queue_for_rep_scrub_resched(PG* pg,
604 Scrub::scrub_prio_t with_high_priority,
605 unsigned int qu_priority,
606 Scrub::act_token_t act_token);
607
608 void queue_for_pg_delete(spg_t pgid, epoch_t e);
609 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
610
611 private:
612 // -- pg recovery and associated throttling --
613 ceph::mutex recovery_lock = ceph::make_mutex("OSDService::recovery_lock");
614 std::list<std::pair<epoch_t, PGRef> > awaiting_throttle;
615
616 /// queue a scrub-related message for a PG
617 template <class MSG_TYPE>
618 void queue_scrub_event_msg(PG* pg,
619 Scrub::scrub_prio_t with_priority,
620 unsigned int qu_priority,
621 Scrub::act_token_t act_token);
622
623 /// An alternative version of queue_scrub_event_msg(), in which the queuing priority is
624 /// provided by the executing scrub (i.e. taken from PgScrubber::m_flags)
625 template <class MSG_TYPE>
626 void queue_scrub_event_msg(PG* pg, Scrub::scrub_prio_t with_priority);
627
628 utime_t defer_recovery_until;
629 uint64_t recovery_ops_active;
630 uint64_t recovery_ops_reserved;
631 bool recovery_paused;
632 #ifdef DEBUG_RECOVERY_OIDS
633 std::map<spg_t, std::set<hobject_t> > recovery_oids;
634 #endif
635 bool _recover_now(uint64_t *available_pushes);
636 void _maybe_queue_recovery();
637 void _queue_for_recovery(
638 std::pair<epoch_t, PGRef> p, uint64_t reserved_pushes);
639 public:
640 void start_recovery_op(PG *pg, const hobject_t& soid);
641 void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
642 bool is_recovery_active();
643 void release_reserved_pushes(uint64_t pushes);
644 void defer_recovery(float defer_for) {
645 defer_recovery_until = ceph_clock_now();
646 defer_recovery_until += defer_for;
647 }
648 void pause_recovery() {
649 std::lock_guard l(recovery_lock);
650 recovery_paused = true;
651 }
652 bool recovery_is_paused() {
653 std::lock_guard l(recovery_lock);
654 return recovery_paused;
655 }
656 void unpause_recovery() {
657 std::lock_guard l(recovery_lock);
658 recovery_paused = false;
659 _maybe_queue_recovery();
660 }
661 void kick_recovery_queue() {
662 std::lock_guard l(recovery_lock);
663 _maybe_queue_recovery();
664 }
665 void clear_queued_recovery(PG *pg) {
666 std::lock_guard l(recovery_lock);
667 awaiting_throttle.remove_if(
668 [pg](decltype(awaiting_throttle)::const_reference awaiting ) {
669 return awaiting.second.get() == pg;
670 });
671 }
672
673 unsigned get_target_pg_log_entries() const;
674
675 // delayed pg activation
676 void queue_for_recovery(PG *pg) {
677 std::lock_guard l(recovery_lock);
678
679 if (pg->is_forced_recovery_or_backfill()) {
680 awaiting_throttle.push_front(std::make_pair(pg->get_osdmap()->get_epoch(), pg));
681 } else {
682 awaiting_throttle.push_back(std::make_pair(pg->get_osdmap()->get_epoch(), pg));
683 }
684 _maybe_queue_recovery();
685 }
686 void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
687 std::lock_guard l(recovery_lock);
688 _queue_for_recovery(std::make_pair(queued, pg), reserved_pushes);
689 }
690
691 void queue_check_readable(spg_t spgid,
692 epoch_t lpr,
693 ceph::signedspan delay = ceph::signedspan::zero());
694
695 // osd map cache (past osd maps)
696 ceph::mutex map_cache_lock = ceph::make_mutex("OSDService::map_cache_lock");
697 SharedLRU<epoch_t, const OSDMap> map_cache;
698 SimpleLRU<epoch_t, ceph::buffer::list> map_bl_cache;
699 SimpleLRU<epoch_t, ceph::buffer::list> map_bl_inc_cache;
700
701 OSDMapRef try_get_map(epoch_t e);
702 OSDMapRef get_map(epoch_t e) {
703 OSDMapRef ret(try_get_map(e));
704 ceph_assert(ret);
705 return ret;
706 }
707 OSDMapRef add_map(OSDMap *o) {
708 std::lock_guard l(map_cache_lock);
709 return _add_map(o);
710 }
711 OSDMapRef _add_map(OSDMap *o);
712
713 void _add_map_bl(epoch_t e, ceph::buffer::list& bl);
714 bool get_map_bl(epoch_t e, ceph::buffer::list& bl) {
715 std::lock_guard l(map_cache_lock);
716 return _get_map_bl(e, bl);
717 }
718 bool _get_map_bl(epoch_t e, ceph::buffer::list& bl);
719
720 void _add_map_inc_bl(epoch_t e, ceph::buffer::list& bl);
721 bool get_inc_map_bl(epoch_t e, ceph::buffer::list& bl);
722
723 /// identify split child pgids over a osdmap interval
724 void identify_splits_and_merges(
725 OSDMapRef old_map,
726 OSDMapRef new_map,
727 spg_t pgid,
728 std::set<std::pair<spg_t,epoch_t>> *new_children,
729 std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
730
731 void need_heartbeat_peer_update();
732
733 void init();
734 void final_init();
735 void start_shutdown();
736 void shutdown_reserver();
737 void shutdown();
738
739 // -- stats --
740 ceph::mutex stat_lock = ceph::make_mutex("OSDService::stat_lock");
741 osd_stat_t osd_stat;
742 uint32_t seq = 0;
743
744 void set_statfs(const struct store_statfs_t &stbuf,
745 osd_alert_list_t& alerts);
746 osd_stat_t set_osd_stat(std::vector<int>& hb_peers, int num_pgs);
747 void inc_osd_stat_repaired(void);
748 float compute_adjusted_ratio(osd_stat_t new_stat, float *pratio, uint64_t adjust_used = 0);
749 osd_stat_t get_osd_stat() {
750 std::lock_guard l(stat_lock);
751 ++seq;
752 osd_stat.up_from = up_epoch;
753 osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
754 return osd_stat;
755 }
756 uint64_t get_osd_stat_seq() {
757 std::lock_guard l(stat_lock);
758 return osd_stat.seq;
759 }
760 void get_hb_pingtime(std::map<int, osd_stat_t::Interfaces> *pp)
761 {
762 std::lock_guard l(stat_lock);
763 *pp = osd_stat.hb_pingtime;
764 return;
765 }
766
767 // -- OSD Full Status --
768 private:
769 friend TestOpsSocketHook;
770 mutable ceph::mutex full_status_lock = ceph::make_mutex("OSDService::full_status_lock");
771 enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
772 const char *get_full_state_name(s_names s) const {
773 switch (s) {
774 case NONE: return "none";
775 case NEARFULL: return "nearfull";
776 case BACKFILLFULL: return "backfillfull";
777 case FULL: return "full";
778 case FAILSAFE: return "failsafe";
779 default: return "???";
780 }
781 }
782 s_names get_full_state(std::string type) const {
783 if (type == "none")
784 return NONE;
785 else if (type == "failsafe")
786 return FAILSAFE;
787 else if (type == "full")
788 return FULL;
789 else if (type == "backfillfull")
790 return BACKFILLFULL;
791 else if (type == "nearfull")
792 return NEARFULL;
793 else
794 return INVALID;
795 }
796 double cur_ratio, physical_ratio; ///< current utilization
797 mutable int64_t injectfull = 0;
798 s_names injectfull_state = NONE;
799 float get_failsafe_full_ratio();
800 bool _check_inject_full(DoutPrefixProvider *dpp, s_names type) const;
801 bool _check_full(DoutPrefixProvider *dpp, s_names type) const;
802 public:
803 void check_full_status(float ratio, float pratio);
804 s_names recalc_full_state(float ratio, float pratio, std::string &inject);
805 bool _tentative_full(DoutPrefixProvider *dpp, s_names type, uint64_t adjust_used, osd_stat_t);
806 bool check_failsafe_full(DoutPrefixProvider *dpp) const;
807 bool check_full(DoutPrefixProvider *dpp) const;
808 bool tentative_backfill_full(DoutPrefixProvider *dpp, uint64_t adjust_used, osd_stat_t);
809 bool check_backfill_full(DoutPrefixProvider *dpp) const;
810 bool check_nearfull(DoutPrefixProvider *dpp) const;
811 bool is_failsafe_full() const;
812 bool is_full() const;
813 bool is_backfillfull() const;
814 bool is_nearfull() const;
815 bool need_fullness_update(); ///< osdmap state needs update
816 void set_injectfull(s_names type, int64_t count);
817
818
819 // -- epochs --
820 private:
821 // protects access to boot_epoch, up_epoch, bind_epoch
822 mutable ceph::mutex epoch_lock = ceph::make_mutex("OSDService::epoch_lock");
823 epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
824 epoch_t up_epoch; // _most_recent_ epoch we were marked up
825 epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
826 public:
827 /**
828 * Retrieve the boot_, up_, and bind_ epochs the OSD has std::set. The params
829 * can be NULL if you don't care about them.
830 */
831 void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
832 epoch_t *_bind_epoch) const;
833 /**
834 * Std::set the boot, up, and bind epochs. Any NULL params will not be std::set.
835 */
836 void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
837 const epoch_t *_bind_epoch);
838 epoch_t get_boot_epoch() const {
839 epoch_t ret;
840 retrieve_epochs(&ret, NULL, NULL);
841 return ret;
842 }
843 epoch_t get_up_epoch() const {
844 epoch_t ret;
845 retrieve_epochs(NULL, &ret, NULL);
846 return ret;
847 }
848 epoch_t get_bind_epoch() const {
849 epoch_t ret;
850 retrieve_epochs(NULL, NULL, &ret);
851 return ret;
852 }
853
854 void request_osdmap_update(epoch_t e);
855
856 // -- heartbeats --
857 ceph::mutex hb_stamp_lock = ceph::make_mutex("OSDServce::hb_stamp_lock");
858
859 /// osd -> heartbeat stamps
860 std::vector<HeartbeatStampsRef> hb_stamps;
861
862 /// get or create a ref for a peer's HeartbeatStamps
863 HeartbeatStampsRef get_hb_stamps(unsigned osd);
864
865
866 // Timer for readable leases
867 ceph::timer<ceph::mono_clock> mono_timer = ceph::timer<ceph::mono_clock>{ceph::construct_suspended};
868
869 void queue_renew_lease(epoch_t epoch, spg_t spgid);
870
871 // -- stopping --
872 ceph::mutex is_stopping_lock = ceph::make_mutex("OSDService::is_stopping_lock");
873 ceph::condition_variable is_stopping_cond;
874 enum {
875 NOT_STOPPING,
876 PREPARING_TO_STOP,
877 STOPPING };
878 std::atomic<int> state{NOT_STOPPING};
879 int get_state() const {
880 return state;
881 }
882 void set_state(int s) {
883 state = s;
884 }
885 bool is_stopping() const {
886 return state == STOPPING;
887 }
888 bool is_preparing_to_stop() const {
889 return state == PREPARING_TO_STOP;
890 }
891 bool prepare_to_stop();
892 void got_stop_ack();
893
894
895 #ifdef PG_DEBUG_REFS
896 ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock");
897 std::map<spg_t, int> pgid_tracker;
898 std::map<spg_t, PG*> live_pgs;
899 void add_pgid(spg_t pgid, PG *pg);
900 void remove_pgid(spg_t pgid, PG *pg);
901 void dump_live_pgids();
902 #endif
903
904 explicit OSDService(OSD *osd, ceph::async::io_context_pool& poolctx);
905 ~OSDService() = default;
906 };
907
908 /*
909
910 Each PG slot includes queues for events that are processing and/or waiting
911 for a PG to be materialized in the slot.
912
913 These are the constraints:
914
915 - client ops must remained ordered by client, regardless of std::map epoch
916 - peering messages/events from peers must remain ordered by peer
917 - peering messages and client ops need not be ordered relative to each other
918
919 - some peering events can create a pg (e.g., notify)
920 - the query peering event can proceed when a PG doesn't exist
921
922 Implementation notes:
923
924 - everybody waits for split. If the OSD has the parent PG it will instantiate
925 the PGSlot early and mark it waiting_for_split. Everything will wait until
926 the parent is able to commit the split operation and the child PG's are
927 materialized in the child slots.
928
929 - every event has an epoch property and will wait for the OSDShard to catch
930 up to that epoch. For example, if we get a peering event from a future
931 epoch, the event will wait in the slot until the local OSD has caught up.
932 (We should be judicious in specifying the required epoch [by, e.g., setting
933 it to the same_interval_since epoch] so that we don't wait for epochs that
934 don't affect the given PG.)
935
936 - we maintain two separate wait lists, *waiting* and *waiting_peering*. The
937 OpSchedulerItem has an is_peering() bool to determine which we use. Waiting
938 peering events are queued up by epoch required.
939
940 - when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or
941 materialized the PG), we wake *all* waiting items. (This could be optimized,
942 probably, but we don't bother.) We always requeue peering items ahead of
943 client ops.
944
945 - some peering events are marked !peering_requires_pg (PGQuery). if we do
946 not have a PG these are processed immediately (under the shard lock).
947
948 - we do not have a PG present, we check if the slot maps to the current host.
949 if so, we either queue the item and wait for the PG to materialize, or
950 (if the event is a pg creating event like PGNotify), we materialize the PG.
951
952 - when we advance the osdmap on the OSDShard, we scan pg slots and
953 discard any slots with no pg (and not waiting_for_split) that no
954 longer std::map to the current host.
955
956 */
957
958 struct OSDShardPGSlot {
959 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
960 PGRef pg; ///< pg reference
961 std::deque<OpSchedulerItem> to_process; ///< order items for this slot
962 int num_running = 0; ///< _process threads doing pg lookup/lock
963
964 std::deque<OpSchedulerItem> waiting; ///< waiting for pg (or map + pg)
965
966 /// waiting for map (peering evt)
967 std::map<epoch_t,std::deque<OpSchedulerItem>> waiting_peering;
968
969 /// incremented by wake_pg_waiters; indicates racing _process threads
970 /// should bail out (their op has been requeued)
971 uint64_t requeue_seq = 0;
972
973 /// waiting for split child to materialize in these epoch(s)
974 std::set<epoch_t> waiting_for_split;
975
976 epoch_t epoch = 0;
977 boost::intrusive::set_member_hook<> pg_epoch_item;
978
979 /// waiting for a merge (source or target) by this epoch
980 epoch_t waiting_for_merge_epoch = 0;
981 };
982
983 struct OSDShard {
984 const unsigned shard_id;
985 CephContext *cct;
986 OSD *osd;
987
988 std::string shard_name;
989
990 std::string sdata_wait_lock_name;
991 ceph::mutex sdata_wait_lock;
992 ceph::condition_variable sdata_cond;
993 int waiting_threads = 0;
994
995 ceph::mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock
996 OSDMapRef shard_osdmap;
997
998 OSDMapRef get_osdmap() {
999 std::lock_guard l(osdmap_lock);
1000 return shard_osdmap;
1001 }
1002
1003 std::string shard_lock_name;
1004 ceph::mutex shard_lock; ///< protects remaining members below
1005
1006 /// map of slots for each spg_t. maintains ordering of items dequeued
1007 /// from scheduler while _process thread drops shard lock to acquire the
1008 /// pg lock. stale slots are removed by consume_map.
1009 std::unordered_map<spg_t,std::unique_ptr<OSDShardPGSlot>> pg_slots;
1010
1011 struct pg_slot_compare_by_epoch {
1012 bool operator()(const OSDShardPGSlot& l, const OSDShardPGSlot& r) const {
1013 return l.epoch < r.epoch;
1014 }
1015 };
1016
1017 /// maintain an ordering of pg slots by pg epoch
1018 boost::intrusive::multiset<
1019 OSDShardPGSlot,
1020 boost::intrusive::member_hook<
1021 OSDShardPGSlot,
1022 boost::intrusive::set_member_hook<>,
1023 &OSDShardPGSlot::pg_epoch_item>,
1024 boost::intrusive::compare<pg_slot_compare_by_epoch>> pg_slots_by_epoch;
1025 int waiting_for_min_pg_epoch = 0;
1026 ceph::condition_variable min_pg_epoch_cond;
1027
1028 /// priority queue
1029 ceph::osd::scheduler::OpSchedulerRef scheduler;
1030
1031 bool stop_waiting = false;
1032
1033 ContextQueue context_queue;
1034
1035 void _attach_pg(OSDShardPGSlot *slot, PG *pg);
1036 void _detach_pg(OSDShardPGSlot *slot);
1037
1038 void update_pg_epoch(OSDShardPGSlot *slot, epoch_t epoch);
1039 epoch_t get_min_pg_epoch();
1040 void wait_min_pg_epoch(epoch_t need);
1041
1042 /// return newest epoch we are waiting for
1043 epoch_t get_max_waiting_epoch();
1044
1045 /// push osdmap into shard
1046 void consume_map(
1047 const OSDMapRef& osdmap,
1048 unsigned *pushes_to_free);
1049
1050 int _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
1051
1052 void identify_splits_and_merges(
1053 const OSDMapRef& as_of_osdmap,
1054 std::set<std::pair<spg_t,epoch_t>> *split_children,
1055 std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
1056 void _prime_splits(std::set<std::pair<spg_t,epoch_t>> *pgids);
1057 void prime_splits(const OSDMapRef& as_of_osdmap,
1058 std::set<std::pair<spg_t,epoch_t>> *pgids);
1059 void prime_merges(const OSDMapRef& as_of_osdmap,
1060 std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
1061 void register_and_wake_split_child(PG *pg);
1062 void unprime_split_children(spg_t parent, unsigned old_pg_num);
1063 void update_scheduler_config();
1064 std::string get_scheduler_type();
1065
1066 OSDShard(
1067 int id,
1068 CephContext *cct,
1069 OSD *osd);
1070 };
1071
1072 class OSD : public Dispatcher,
1073 public md_config_obs_t {
1074 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
1075
1076 /** OSD **/
1077 // global lock
1078 ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock");
1079 SafeTimer tick_timer; // safe timer (osd_lock)
1080
1081 // Tick timer for those stuff that do not need osd_lock
1082 ceph::mutex tick_timer_lock = ceph::make_mutex("OSD::tick_timer_lock");
1083 SafeTimer tick_timer_without_osd_lock;
1084 std::string gss_ktfile_client{};
1085
1086 public:
1087 // config observer bits
1088 const char** get_tracked_conf_keys() const override;
1089 void handle_conf_change(const ConfigProxy& conf,
1090 const std::set <std::string> &changed) override;
1091 void update_log_config();
1092 void check_config();
1093
1094 protected:
1095
1096 const double OSD_TICK_INTERVAL = { 1.0 };
1097 double get_tick_interval() const;
1098
1099 Messenger *cluster_messenger;
1100 Messenger *client_messenger;
1101 Messenger *objecter_messenger;
1102 MonClient *monc; // check the "monc helpers" list before accessing directly
1103 MgrClient mgrc;
1104 PerfCounters *logger;
1105 PerfCounters *recoverystate_perf;
1106 std::unique_ptr<ObjectStore> store;
1107 #ifdef HAVE_LIBFUSE
1108 FuseStore *fuse_store = nullptr;
1109 #endif
1110 LogClient log_client;
1111 LogChannelRef clog;
1112
1113 int whoami;
1114 std::string dev_path, journal_path;
1115
1116 ceph_release_t last_require_osd_release{ceph_release_t::unknown};
1117
1118 int numa_node = -1;
1119 size_t numa_cpu_set_size = 0;
1120 cpu_set_t numa_cpu_set;
1121
1122 bool store_is_rotational = true;
1123 bool journal_is_rotational = true;
1124
1125 ZTracer::Endpoint trace_endpoint;
1126 PerfCounters* create_logger();
1127 PerfCounters* create_recoverystate_perf();
1128 void tick();
1129 void tick_without_osd_lock();
1130 void _dispatch(Message *m);
1131 void dispatch_op(OpRequestRef op);
1132
1133 void check_osdmap_features();
1134
1135 // asok
1136 friend class OSDSocketHook;
1137 class OSDSocketHook *asok_hook;
1138 using PGRefOrError = std::tuple<std::optional<PGRef>, int>;
1139 PGRefOrError locate_asok_target(const cmdmap_t& cmdmap,
1140 std::stringstream& ss, bool only_primary);
1141 int asok_route_to_pg(bool only_primary,
1142 std::string_view prefix,
1143 cmdmap_t cmdmap,
1144 Formatter *f,
1145 std::stringstream& ss,
1146 const bufferlist& inbl,
1147 bufferlist& outbl,
1148 std::function<void(int, const std::string&, bufferlist&)> on_finish);
1149 void asok_command(
1150 std::string_view prefix,
1151 const cmdmap_t& cmdmap,
1152 ceph::Formatter *f,
1153 const ceph::buffer::list& inbl,
1154 std::function<void(int,const std::string&,ceph::buffer::list&)> on_finish);
1155
1156 public:
1157 int get_nodeid() { return whoami; }
1158
1159 static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
1160 char foo[20];
1161 snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
1162 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1163 }
1164 static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
1165 char foo[22];
1166 snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch);
1167 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1168 }
1169
1170 static ghobject_t make_snapmapper_oid() {
1171 return ghobject_t(hobject_t(
1172 sobject_t(
1173 object_t("snapmapper"),
1174 0)));
1175 }
1176 static ghobject_t make_purged_snaps_oid() {
1177 return ghobject_t(hobject_t(
1178 sobject_t(
1179 object_t("purged_snaps"),
1180 0)));
1181 }
1182
1183 static ghobject_t make_pg_log_oid(spg_t pg) {
1184 std::stringstream ss;
1185 ss << "pglog_" << pg;
1186 std::string s;
1187 getline(ss, s);
1188 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1189 }
1190
1191 static ghobject_t make_pg_biginfo_oid(spg_t pg) {
1192 std::stringstream ss;
1193 ss << "pginfo_" << pg;
1194 std::string s;
1195 getline(ss, s);
1196 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1197 }
1198 static ghobject_t make_infos_oid() {
1199 hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
1200 return ghobject_t(oid);
1201 }
1202
1203 static ghobject_t make_final_pool_info_oid(int64_t pool) {
1204 return ghobject_t(
1205 hobject_t(
1206 sobject_t(
1207 object_t(std::string("final_pool_") + stringify(pool)),
1208 CEPH_NOSNAP)));
1209 }
1210
1211 static ghobject_t make_pg_num_history_oid() {
1212 return ghobject_t(hobject_t(sobject_t("pg_num_history", CEPH_NOSNAP)));
1213 }
1214
1215 static void recursive_remove_collection(CephContext* cct,
1216 ObjectStore *store,
1217 spg_t pgid,
1218 coll_t tmp);
1219
1220 /**
1221 * get_osd_initial_compat_set()
1222 *
1223 * Get the initial feature std::set for this OSD. Features
1224 * here are automatically upgraded.
1225 *
1226 * Return value: Initial osd CompatSet
1227 */
1228 static CompatSet get_osd_initial_compat_set();
1229
1230 /**
1231 * get_osd_compat_set()
1232 *
1233 * Get all features supported by this OSD
1234 *
1235 * Return value: CompatSet of all supported features
1236 */
1237 static CompatSet get_osd_compat_set();
1238
1239
1240 private:
1241 class C_Tick;
1242 class C_Tick_WithoutOSDLock;
1243
1244 // -- config settings --
1245 float m_osd_pg_epoch_max_lag_factor;
1246
1247 // -- superblock --
1248 OSDSuperblock superblock;
1249
1250 void write_superblock();
1251 void write_superblock(ObjectStore::Transaction& t);
1252 int read_superblock();
1253
1254 void clear_temp_objects();
1255
1256 CompatSet osd_compat;
1257
1258 // -- state --
1259 public:
1260 typedef enum {
1261 STATE_INITIALIZING = 1,
1262 STATE_PREBOOT,
1263 STATE_BOOTING,
1264 STATE_ACTIVE,
1265 STATE_STOPPING,
1266 STATE_WAITING_FOR_HEALTHY
1267 } osd_state_t;
1268
1269 static const char *get_state_name(int s) {
1270 switch (s) {
1271 case STATE_INITIALIZING: return "initializing";
1272 case STATE_PREBOOT: return "preboot";
1273 case STATE_BOOTING: return "booting";
1274 case STATE_ACTIVE: return "active";
1275 case STATE_STOPPING: return "stopping";
1276 case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy";
1277 default: return "???";
1278 }
1279 }
1280
1281 private:
1282 std::atomic<int> state{STATE_INITIALIZING};
1283
1284 public:
1285 int get_state() const {
1286 return state;
1287 }
1288 void set_state(int s) {
1289 state = s;
1290 }
1291 bool is_initializing() const {
1292 return state == STATE_INITIALIZING;
1293 }
1294 bool is_preboot() const {
1295 return state == STATE_PREBOOT;
1296 }
1297 bool is_booting() const {
1298 return state == STATE_BOOTING;
1299 }
1300 bool is_active() const {
1301 return state == STATE_ACTIVE;
1302 }
1303 bool is_stopping() const {
1304 return state == STATE_STOPPING;
1305 }
1306 bool is_waiting_for_healthy() const {
1307 return state == STATE_WAITING_FOR_HEALTHY;
1308 }
1309
1310 private:
1311
1312 ShardedThreadPool osd_op_tp;
1313
1314 void get_latest_osdmap();
1315
1316 // -- sessions --
1317 private:
1318 void dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap);
1319
1320 ceph::mutex session_waiting_lock = ceph::make_mutex("OSD::session_waiting_lock");
1321 std::set<ceph::ref_t<Session>> session_waiting_for_map;
1322
1323 /// Caller assumes refs for included Sessions
1324 void get_sessions_waiting_for_map(std::set<ceph::ref_t<Session>> *out) {
1325 std::lock_guard l(session_waiting_lock);
1326 out->swap(session_waiting_for_map);
1327 }
1328 void register_session_waiting_on_map(const ceph::ref_t<Session>& session) {
1329 std::lock_guard l(session_waiting_lock);
1330 session_waiting_for_map.insert(session);
1331 }
1332 void clear_session_waiting_on_map(const ceph::ref_t<Session>& session) {
1333 std::lock_guard l(session_waiting_lock);
1334 session_waiting_for_map.erase(session);
1335 }
1336 void dispatch_sessions_waiting_on_map() {
1337 std::set<ceph::ref_t<Session>> sessions_to_check;
1338 get_sessions_waiting_for_map(&sessions_to_check);
1339 for (auto i = sessions_to_check.begin();
1340 i != sessions_to_check.end();
1341 sessions_to_check.erase(i++)) {
1342 std::lock_guard l{(*i)->session_dispatch_lock};
1343 dispatch_session_waiting(*i, get_osdmap());
1344 }
1345 }
1346 void session_handle_reset(const ceph::ref_t<Session>& session) {
1347 std::lock_guard l(session->session_dispatch_lock);
1348 clear_session_waiting_on_map(session);
1349
1350 session->clear_backoffs();
1351
1352 /* Messages have connection refs, we need to clear the
1353 * connection->session->message->connection
1354 * cycles which result.
1355 * Bug #12338
1356 */
1357 session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
1358 }
1359
1360 private:
1361 /**
1362 * @defgroup monc helpers
1363 * @{
1364 * Right now we only have the one
1365 */
1366
1367 /**
1368 * Ask the Monitors for a sequence of OSDMaps.
1369 *
1370 * @param epoch The epoch to start with when replying
1371 * @param force_request True if this request forces a new subscription to
1372 * the monitors; false if an outstanding request that encompasses it is
1373 * sufficient.
1374 */
1375 void osdmap_subscribe(version_t epoch, bool force_request);
1376 /** @} monc helpers */
1377
1378 ceph::mutex osdmap_subscribe_lock = ceph::make_mutex("OSD::osdmap_subscribe_lock");
1379 epoch_t latest_subscribed_epoch{0};
1380
1381 // -- heartbeat --
1382 /// information about a heartbeat peer
1383 struct HeartbeatInfo {
1384 int peer; ///< peer
1385 ConnectionRef con_front; ///< peer connection (front)
1386 ConnectionRef con_back; ///< peer connection (back)
1387 utime_t first_tx; ///< time we sent our first ping request
1388 utime_t last_tx; ///< last time we sent a ping request
1389 utime_t last_rx_front; ///< last time we got a ping reply on the front side
1390 utime_t last_rx_back; ///< last time we got a ping reply on the back side
1391 epoch_t epoch; ///< most recent epoch we wanted this peer
1392 /// number of connections we send and receive heartbeat pings/replies
1393 static constexpr int HEARTBEAT_MAX_CONN = 2;
1394 /// history of inflight pings, arranging by timestamp we sent
1395 /// send time -> deadline -> remaining replies
1396 std::map<utime_t, std::pair<utime_t, int>> ping_history;
1397
1398 utime_t hb_interval_start;
1399 uint32_t hb_average_count = 0;
1400 uint32_t hb_index = 0;
1401
1402 uint32_t hb_total_back = 0;
1403 uint32_t hb_min_back = UINT_MAX;
1404 uint32_t hb_max_back = 0;
1405 std::vector<uint32_t> hb_back_pingtime;
1406 std::vector<uint32_t> hb_back_min;
1407 std::vector<uint32_t> hb_back_max;
1408
1409 uint32_t hb_total_front = 0;
1410 uint32_t hb_min_front = UINT_MAX;
1411 uint32_t hb_max_front = 0;
1412 std::vector<uint32_t> hb_front_pingtime;
1413 std::vector<uint32_t> hb_front_min;
1414 std::vector<uint32_t> hb_front_max;
1415
1416 bool is_stale(utime_t stale) const {
1417 if (ping_history.empty()) {
1418 return false;
1419 }
1420 utime_t oldest_deadline = ping_history.begin()->second.first;
1421 return oldest_deadline <= stale;
1422 }
1423
1424 bool is_unhealthy(utime_t now) const {
1425 if (ping_history.empty()) {
1426 /// we haven't sent a ping yet or we have got all replies,
1427 /// in either way we are safe and healthy for now
1428 return false;
1429 }
1430
1431 utime_t oldest_deadline = ping_history.begin()->second.first;
1432 return now > oldest_deadline;
1433 }
1434
1435 bool is_healthy(utime_t now) const {
1436 if (last_rx_front == utime_t() || last_rx_back == utime_t()) {
1437 // only declare to be healthy until we have received the first
1438 // replies from both front/back connections
1439 return false;
1440 }
1441 return !is_unhealthy(now);
1442 }
1443
1444 void clear_mark_down(Connection *except = nullptr) {
1445 if (con_back && con_back != except) {
1446 con_back->mark_down();
1447 con_back->clear_priv();
1448 con_back.reset(nullptr);
1449 }
1450 if (con_front && con_front != except) {
1451 con_front->mark_down();
1452 con_front->clear_priv();
1453 con_front.reset(nullptr);
1454 }
1455 }
1456 };
1457
1458 ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock");
1459 std::map<int, int> debug_heartbeat_drops_remaining;
1460 ceph::condition_variable heartbeat_cond;
1461 bool heartbeat_stop;
1462 std::atomic<bool> heartbeat_need_update;
1463 std::map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
1464 utime_t last_mon_heartbeat;
1465 Messenger *hb_front_client_messenger;
1466 Messenger *hb_back_client_messenger;
1467 Messenger *hb_front_server_messenger;
1468 Messenger *hb_back_server_messenger;
1469 utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
1470 double daily_loadavg;
1471 ceph::mono_time startup_time;
1472
1473 // Track ping repsonse times using vector as a circular buffer
1474 // MUST BE A POWER OF 2
1475 const uint32_t hb_vector_size = 16;
1476
1477 void _add_heartbeat_peer(int p);
1478 void _remove_heartbeat_peer(int p);
1479 bool heartbeat_reset(Connection *con);
1480 void maybe_update_heartbeat_peers();
1481 void reset_heartbeat_peers(bool all);
1482 bool heartbeat_peers_need_update() {
1483 return heartbeat_need_update.load();
1484 }
1485 void heartbeat_set_peers_need_update() {
1486 heartbeat_need_update.store(true);
1487 }
1488 void heartbeat_clear_peers_need_update() {
1489 heartbeat_need_update.store(false);
1490 }
1491 void heartbeat();
1492 void heartbeat_check();
1493 void heartbeat_entry();
1494 void need_heartbeat_peer_update();
1495
1496 void heartbeat_kick() {
1497 std::lock_guard l(heartbeat_lock);
1498 heartbeat_cond.notify_all();
1499 }
1500
1501 struct T_Heartbeat : public Thread {
1502 OSD *osd;
1503 explicit T_Heartbeat(OSD *o) : osd(o) {}
1504 void *entry() override {
1505 osd->heartbeat_entry();
1506 return 0;
1507 }
1508 } heartbeat_thread;
1509
1510 public:
1511 bool heartbeat_dispatch(Message *m);
1512
1513 struct HeartbeatDispatcher : public Dispatcher {
1514 OSD *osd;
1515 explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
1516
1517 bool ms_can_fast_dispatch_any() const override { return true; }
1518 bool ms_can_fast_dispatch(const Message *m) const override {
1519 switch (m->get_type()) {
1520 case CEPH_MSG_PING:
1521 case MSG_OSD_PING:
1522 return true;
1523 default:
1524 return false;
1525 }
1526 }
1527 void ms_fast_dispatch(Message *m) override {
1528 osd->heartbeat_dispatch(m);
1529 }
1530 bool ms_dispatch(Message *m) override {
1531 return osd->heartbeat_dispatch(m);
1532 }
1533 bool ms_handle_reset(Connection *con) override {
1534 return osd->heartbeat_reset(con);
1535 }
1536 void ms_handle_remote_reset(Connection *con) override {}
1537 bool ms_handle_refused(Connection *con) override {
1538 return osd->ms_handle_refused(con);
1539 }
1540 int ms_handle_authentication(Connection *con) override {
1541 return true;
1542 }
1543 } heartbeat_dispatcher;
1544
1545 private:
1546 // -- waiters --
1547 std::list<OpRequestRef> finished;
1548
1549 void take_waiters(std::list<OpRequestRef>& ls) {
1550 ceph_assert(ceph_mutex_is_locked(osd_lock));
1551 finished.splice(finished.end(), ls);
1552 }
1553 void do_waiters();
1554
1555 // -- op tracking --
1556 OpTracker op_tracker;
1557 void test_ops(std::string command, std::string args, std::ostream& ss);
1558 friend class TestOpsSocketHook;
1559 TestOpsSocketHook *test_ops_hook;
1560 friend struct C_FinishSplits;
1561 friend struct C_OpenPGs;
1562
1563 protected:
1564
1565 /*
1566 * The ordered op delivery chain is:
1567 *
1568 * fast dispatch -> scheduler back
1569 * scheduler front <-> to_process back
1570 * to_process front -> RunVis(item)
1571 * <- queue_front()
1572 *
1573 * The scheduler is per-shard, and to_process is per pg_slot. Items can be
1574 * pushed back up into to_process and/or scheduler while order is preserved.
1575 *
1576 * Multiple worker threads can operate on each shard.
1577 *
1578 * Under normal circumstances, num_running == to_process.size(). There are
1579 * two times when that is not true: (1) when waiting_for_pg == true and
1580 * to_process is accumulating requests that are waiting for the pg to be
1581 * instantiated; in that case they will all get requeued together by
1582 * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
1583 * and already requeued the items.
1584 */
1585 friend class ceph::osd::scheduler::PGOpItem;
1586 friend class ceph::osd::scheduler::PGPeeringItem;
1587 friend class ceph::osd::scheduler::PGRecovery;
1588 friend class ceph::osd::scheduler::PGRecoveryMsg;
1589 friend class ceph::osd::scheduler::PGDelete;
1590
1591 class ShardedOpWQ
1592 : public ShardedThreadPool::ShardedWQ<OpSchedulerItem>
1593 {
1594 OSD *osd;
1595 bool m_fast_shutdown = false;
1596 public:
1597 ShardedOpWQ(OSD *o,
1598 ceph::timespan ti,
1599 ceph::timespan si,
1600 ShardedThreadPool* tp)
1601 : ShardedThreadPool::ShardedWQ<OpSchedulerItem>(ti, si, tp),
1602 osd(o) {
1603 }
1604
1605 void _add_slot_waiter(
1606 spg_t token,
1607 OSDShardPGSlot *slot,
1608 OpSchedulerItem&& qi);
1609
1610 /// try to do some work
1611 void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb) override;
1612
1613 void stop_for_fast_shutdown();
1614
1615 /// enqueue a new item
1616 void _enqueue(OpSchedulerItem&& item) override;
1617
1618 /// requeue an old item (at the front of the line)
1619 void _enqueue_front(OpSchedulerItem&& item) override;
1620
1621 void return_waiting_threads() override {
1622 for(uint32_t i = 0; i < osd->num_shards; i++) {
1623 OSDShard* sdata = osd->shards[i];
1624 assert (NULL != sdata);
1625 std::scoped_lock l{sdata->sdata_wait_lock};
1626 sdata->stop_waiting = true;
1627 sdata->sdata_cond.notify_all();
1628 }
1629 }
1630
1631 void stop_return_waiting_threads() override {
1632 for(uint32_t i = 0; i < osd->num_shards; i++) {
1633 OSDShard* sdata = osd->shards[i];
1634 assert (NULL != sdata);
1635 std::scoped_lock l{sdata->sdata_wait_lock};
1636 sdata->stop_waiting = false;
1637 }
1638 }
1639
1640 void dump(ceph::Formatter *f) {
1641 for(uint32_t i = 0; i < osd->num_shards; i++) {
1642 auto &&sdata = osd->shards[i];
1643
1644 char queue_name[32] = {0};
1645 snprintf(queue_name, sizeof(queue_name), "%s%" PRIu32, "OSD:ShardedOpWQ:", i);
1646 ceph_assert(NULL != sdata);
1647
1648 std::scoped_lock l{sdata->shard_lock};
1649 f->open_object_section(queue_name);
1650 sdata->scheduler->dump(*f);
1651 f->close_section();
1652 }
1653 }
1654
1655 bool is_shard_empty(uint32_t thread_index) override {
1656 uint32_t shard_index = thread_index % osd->num_shards;
1657 auto &&sdata = osd->shards[shard_index];
1658 ceph_assert(sdata);
1659 std::lock_guard l(sdata->shard_lock);
1660 if (thread_index < osd->num_shards) {
1661 return sdata->scheduler->empty() && sdata->context_queue.empty();
1662 } else {
1663 return sdata->scheduler->empty();
1664 }
1665 }
1666
1667 void handle_oncommits(std::list<Context*>& oncommits) {
1668 for (auto p : oncommits) {
1669 p->complete(0);
1670 }
1671 }
1672 } op_shardedwq;
1673
1674
1675 void enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch);
1676 void dequeue_op(
1677 PGRef pg, OpRequestRef op,
1678 ThreadPool::TPHandle &handle);
1679
1680 void enqueue_peering_evt(
1681 spg_t pgid,
1682 PGPeeringEventRef ref);
1683 void dequeue_peering_evt(
1684 OSDShard *sdata,
1685 PG *pg,
1686 PGPeeringEventRef ref,
1687 ThreadPool::TPHandle& handle);
1688
1689 void dequeue_delete(
1690 OSDShard *sdata,
1691 PG *pg,
1692 epoch_t epoch,
1693 ThreadPool::TPHandle& handle);
1694
1695 friend class PG;
1696 friend struct OSDShard;
1697 friend class PrimaryLogPG;
1698 friend class PgScrubber;
1699
1700
1701 protected:
1702
1703 // -- osd map --
1704 // TODO: switch to std::atomic<OSDMapRef> when C++20 will be available.
1705 OSDMapRef _osdmap;
1706 void set_osdmap(OSDMapRef osdmap) {
1707 std::atomic_store(&_osdmap, osdmap);
1708 }
1709 OSDMapRef get_osdmap() const {
1710 return std::atomic_load(&_osdmap);
1711 }
1712 epoch_t get_osdmap_epoch() const {
1713 // XXX: performance?
1714 auto osdmap = get_osdmap();
1715 return osdmap ? osdmap->get_epoch() : 0;
1716 }
1717
1718 pool_pg_num_history_t pg_num_history;
1719
1720 ceph::shared_mutex map_lock = ceph::make_shared_mutex("OSD::map_lock");
1721 std::list<OpRequestRef> waiting_for_osdmap;
1722 std::deque<utime_t> osd_markdown_log;
1723
1724 friend struct send_map_on_destruct;
1725
1726 void wait_for_new_map(OpRequestRef op);
1727 void handle_osd_map(class MOSDMap *m);
1728 void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
1729 void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
1730 void note_down_osd(int osd);
1731 void note_up_osd(int osd);
1732 friend struct C_OnMapCommit;
1733
1734 bool advance_pg(
1735 epoch_t advance_to,
1736 PG *pg,
1737 ThreadPool::TPHandle &handle,
1738 PeeringCtx &rctx);
1739 void consume_map();
1740 void activate_map();
1741
1742 // osd map cache (past osd maps)
1743 OSDMapRef get_map(epoch_t e) {
1744 return service.get_map(e);
1745 }
1746 OSDMapRef add_map(OSDMap *o) {
1747 return service.add_map(o);
1748 }
1749 bool get_map_bl(epoch_t e, ceph::buffer::list& bl) {
1750 return service.get_map_bl(e, bl);
1751 }
1752
1753 public:
1754 // -- shards --
1755 std::vector<OSDShard*> shards;
1756 uint32_t num_shards = 0;
1757
1758 void inc_num_pgs() {
1759 ++num_pgs;
1760 }
1761 void dec_num_pgs() {
1762 --num_pgs;
1763 }
1764 int get_num_pgs() const {
1765 return num_pgs;
1766 }
1767
1768 protected:
1769 ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
1770 /// merge epoch -> target pgid -> source pgid -> pg
1771 std::map<epoch_t,std::map<spg_t,std::map<spg_t,PGRef>>> merge_waiters;
1772
1773 bool add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef source,
1774 unsigned need);
1775
1776 // -- placement groups --
1777 std::atomic<size_t> num_pgs = {0};
1778
1779 std::mutex pending_creates_lock;
1780 using create_from_osd_t = std::pair<spg_t, bool /* is primary*/>;
1781 std::set<create_from_osd_t> pending_creates_from_osd;
1782 unsigned pending_creates_from_mon = 0;
1783
1784 PGRecoveryStats pg_recovery_stats;
1785
1786 PGRef _lookup_pg(spg_t pgid);
1787 PGRef _lookup_lock_pg(spg_t pgid);
1788 void register_pg(PGRef pg);
1789 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
1790
1791 void _get_pgs(std::vector<PGRef> *v, bool clear_too=false);
1792 void _get_pgids(std::vector<spg_t> *v);
1793
1794 public:
1795 PGRef lookup_lock_pg(spg_t pgid);
1796
1797 std::set<int64_t> get_mapped_pools();
1798
1799 protected:
1800 PG* _make_pg(OSDMapRef createmap, spg_t pgid);
1801
1802 bool maybe_wait_for_max_pg(const OSDMapRef& osdmap,
1803 spg_t pgid, bool is_mon_create);
1804 void resume_creating_pg();
1805
1806 void load_pgs();
1807
1808 /// build initial pg history and intervals on create
1809 void build_initial_pg_history(
1810 spg_t pgid,
1811 epoch_t created,
1812 utime_t created_stamp,
1813 pg_history_t *h,
1814 PastIntervals *pi);
1815
1816 epoch_t last_pg_create_epoch;
1817
1818 void handle_pg_create(OpRequestRef op);
1819
1820 void split_pgs(
1821 PG *parent,
1822 const std::set<spg_t> &childpgids, std::set<PGRef> *out_pgs,
1823 OSDMapRef curmap,
1824 OSDMapRef nextmap,
1825 PeeringCtx &rctx);
1826 void _finish_splits(std::set<PGRef>& pgs);
1827
1828 // == monitor interaction ==
1829 ceph::mutex mon_report_lock = ceph::make_mutex("OSD::mon_report_lock");
1830 utime_t last_mon_report;
1831 Finisher boot_finisher;
1832
1833 // -- boot --
1834 void start_boot();
1835 void _got_mon_epochs(epoch_t oldest, epoch_t newest);
1836 void _preboot(epoch_t oldest, epoch_t newest);
1837 void _send_boot();
1838 void _collect_metadata(std::map<std::string,std::string> *pmeta);
1839 void _get_purged_snaps();
1840 void handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *r);
1841
1842 void start_waiting_for_healthy();
1843 bool _is_healthy();
1844
1845 void send_full_update();
1846
1847 friend struct CB_OSD_GetVersion;
1848
1849 // -- alive --
1850 epoch_t up_thru_wanted;
1851
1852 void queue_want_up_thru(epoch_t want);
1853 void send_alive();
1854
1855 // -- full map requests --
1856 epoch_t requested_full_first, requested_full_last;
1857
1858 void request_full_map(epoch_t first, epoch_t last);
1859 void rerequest_full_maps() {
1860 epoch_t first = requested_full_first;
1861 epoch_t last = requested_full_last;
1862 requested_full_first = 0;
1863 requested_full_last = 0;
1864 request_full_map(first, last);
1865 }
1866 void got_full_map(epoch_t e);
1867
1868 // -- failures --
1869 std::map<int,utime_t> failure_queue;
1870 std::map<int,std::pair<utime_t,entity_addrvec_t> > failure_pending;
1871
1872 void requeue_failures();
1873 void send_failures();
1874 void send_still_alive(epoch_t epoch, int osd, const entity_addrvec_t &addrs);
1875 void cancel_pending_failures();
1876
1877 ceph::coarse_mono_clock::time_point last_sent_beacon;
1878 ceph::mutex min_last_epoch_clean_lock = ceph::make_mutex("OSD::min_last_epoch_clean_lock");
1879 epoch_t min_last_epoch_clean = 0;
1880 // which pgs were scanned for min_lec
1881 std::vector<pg_t> min_last_epoch_clean_pgs;
1882 void send_beacon(const ceph::coarse_mono_clock::time_point& now);
1883
1884 ceph_tid_t get_tid() {
1885 return service.get_tid();
1886 }
1887
1888 // -- generic pg peering --
1889 void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
1890 ThreadPool::TPHandle *handle = NULL);
1891
1892 bool require_mon_peer(const Message *m);
1893 bool require_mon_or_mgr_peer(const Message *m);
1894 bool require_osd_peer(const Message *m);
1895 /***
1896 * Verifies that we were alive in the given epoch, and that
1897 * still are.
1898 */
1899 bool require_self_aliveness(const Message *m, epoch_t alive_since);
1900 /**
1901 * Verifies that the OSD who sent the given op has the same
1902 * address as in the given std::map.
1903 * @pre op was sent by an OSD using the cluster messenger
1904 */
1905 bool require_same_peer_instance(const Message *m, const OSDMapRef& map,
1906 bool is_fast_dispatch);
1907
1908 bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
1909 bool is_fast_dispatch);
1910
1911 void handle_fast_pg_create(MOSDPGCreate2 *m);
1912 void handle_pg_query_nopg(const MQuery& q);
1913 void handle_fast_pg_notify(MOSDPGNotify *m);
1914 void handle_pg_notify_nopg(const MNotifyRec& q);
1915 void handle_fast_pg_info(MOSDPGInfo *m);
1916 void handle_fast_pg_remove(MOSDPGRemove *m);
1917
1918 public:
1919 // used by OSDShard
1920 PGRef handle_pg_create_info(const OSDMapRef& osdmap, const PGCreateInfo *info);
1921 protected:
1922
1923 void handle_fast_force_recovery(MOSDForceRecovery *m);
1924
1925 // -- commands --
1926 void handle_command(class MCommand *m);
1927
1928
1929 // -- pg recovery --
1930 void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
1931 ThreadPool::TPHandle &handle);
1932
1933
1934 // -- scrubbing --
1935 void sched_scrub();
1936 void resched_all_scrubs();
1937 bool scrub_random_backoff();
1938
1939 // -- status reporting --
1940 MPGStats *collect_pg_stats();
1941 std::vector<DaemonHealthMetric> get_health_metrics();
1942
1943
1944 private:
1945 bool ms_can_fast_dispatch_any() const override { return true; }
1946 bool ms_can_fast_dispatch(const Message *m) const override {
1947 switch (m->get_type()) {
1948 case CEPH_MSG_PING:
1949 case CEPH_MSG_OSD_OP:
1950 case CEPH_MSG_OSD_BACKOFF:
1951 case MSG_OSD_SCRUB2:
1952 case MSG_OSD_FORCE_RECOVERY:
1953 case MSG_MON_COMMAND:
1954 case MSG_OSD_PG_CREATE2:
1955 case MSG_OSD_PG_QUERY:
1956 case MSG_OSD_PG_QUERY2:
1957 case MSG_OSD_PG_INFO:
1958 case MSG_OSD_PG_INFO2:
1959 case MSG_OSD_PG_NOTIFY:
1960 case MSG_OSD_PG_NOTIFY2:
1961 case MSG_OSD_PG_LOG:
1962 case MSG_OSD_PG_TRIM:
1963 case MSG_OSD_PG_REMOVE:
1964 case MSG_OSD_BACKFILL_RESERVE:
1965 case MSG_OSD_RECOVERY_RESERVE:
1966 case MSG_OSD_REPOP:
1967 case MSG_OSD_REPOPREPLY:
1968 case MSG_OSD_PG_PUSH:
1969 case MSG_OSD_PG_PULL:
1970 case MSG_OSD_PG_PUSH_REPLY:
1971 case MSG_OSD_PG_SCAN:
1972 case MSG_OSD_PG_BACKFILL:
1973 case MSG_OSD_PG_BACKFILL_REMOVE:
1974 case MSG_OSD_EC_WRITE:
1975 case MSG_OSD_EC_WRITE_REPLY:
1976 case MSG_OSD_EC_READ:
1977 case MSG_OSD_EC_READ_REPLY:
1978 case MSG_OSD_SCRUB_RESERVE:
1979 case MSG_OSD_REP_SCRUB:
1980 case MSG_OSD_REP_SCRUBMAP:
1981 case MSG_OSD_PG_UPDATE_LOG_MISSING:
1982 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
1983 case MSG_OSD_PG_RECOVERY_DELETE:
1984 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
1985 case MSG_OSD_PG_LEASE:
1986 case MSG_OSD_PG_LEASE_ACK:
1987 return true;
1988 default:
1989 return false;
1990 }
1991 }
1992 void ms_fast_dispatch(Message *m) override;
1993 bool ms_dispatch(Message *m) override;
1994 void ms_handle_connect(Connection *con) override;
1995 void ms_handle_fast_connect(Connection *con) override;
1996 void ms_handle_fast_accept(Connection *con) override;
1997 int ms_handle_authentication(Connection *con) override;
1998 bool ms_handle_reset(Connection *con) override;
1999 void ms_handle_remote_reset(Connection *con) override {}
2000 bool ms_handle_refused(Connection *con) override;
2001
2002 public:
2003 /* internal and external can point to the same messenger, they will still
2004 * be cleaned up properly*/
2005 OSD(CephContext *cct_,
2006 std::unique_ptr<ObjectStore> store_,
2007 int id,
2008 Messenger *internal,
2009 Messenger *external,
2010 Messenger *hb_front_client,
2011 Messenger *hb_back_client,
2012 Messenger *hb_front_server,
2013 Messenger *hb_back_server,
2014 Messenger *osdc_messenger,
2015 MonClient *mc, const std::string &dev, const std::string &jdev,
2016 ceph::async::io_context_pool& poolctx);
2017 ~OSD() override;
2018
2019 // static bits
2020 static int mkfs(CephContext *cct,
2021 std::unique_ptr<ObjectStore> store,
2022 uuid_d fsid,
2023 int whoami,
2024 std::string osdspec_affinity);
2025
2026 /* remove any non-user xattrs from a std::map of them */
2027 void filter_xattrs(std::map<std::string, ceph::buffer::ptr>& attrs) {
2028 for (std::map<std::string, ceph::buffer::ptr>::iterator iter = attrs.begin();
2029 iter != attrs.end();
2030 ) {
2031 if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
2032 attrs.erase(iter++);
2033 else ++iter;
2034 }
2035 }
2036
2037 private:
2038 int mon_cmd_maybe_osd_create(std::string &cmd);
2039 int update_crush_device_class();
2040 int update_crush_location();
2041
2042 static int write_meta(CephContext *cct,
2043 ObjectStore *store,
2044 uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami, std::string& osdspec_affinity);
2045
2046 void handle_scrub(class MOSDScrub *m);
2047 void handle_fast_scrub(class MOSDScrub2 *m);
2048 void handle_osd_ping(class MOSDPing *m);
2049
2050 size_t get_num_cache_shards();
2051 int get_num_op_shards();
2052 int get_num_op_threads();
2053
2054 float get_osd_recovery_sleep();
2055 float get_osd_delete_sleep();
2056 float get_osd_snap_trim_sleep();
2057
2058 int get_recovery_max_active();
2059 void maybe_override_max_osd_capacity_for_qos();
2060 bool maybe_override_options_for_qos();
2061 int run_osd_bench_test(int64_t count,
2062 int64_t bsize,
2063 int64_t osize,
2064 int64_t onum,
2065 double *elapsed,
2066 std::ostream& ss);
2067 int mon_cmd_set_config(const std::string &key, const std::string &val);
2068 bool unsupported_objstore_for_qos();
2069
2070 void scrub_purged_snaps();
2071 void probe_smart(const std::string& devid, std::ostream& ss);
2072
2073 public:
2074 static int peek_meta(ObjectStore *store,
2075 std::string *magic,
2076 uuid_d *cluster_fsid,
2077 uuid_d *osd_fsid,
2078 int *whoami,
2079 ceph_release_t *min_osd_release);
2080
2081
2082 // startup/shutdown
2083 int pre_init();
2084 int init();
2085 void final_init();
2086
2087 int enable_disable_fuse(bool stop);
2088 int set_numa_affinity();
2089
2090 void suicide(int exitcode);
2091 int shutdown();
2092
2093 void handle_signal(int signum);
2094
2095 /// check if we can throw out op from a disconnected client
2096 static bool op_is_discardable(const MOSDOp *m);
2097
2098 public:
2099 OSDService service;
2100 friend class OSDService;
2101
2102 private:
2103 void set_perf_queries(const ConfigPayload &config_payload);
2104 MetricPayload get_perf_reports();
2105
2106 ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock");
2107 std::list<OSDPerfMetricQuery> m_perf_queries;
2108 std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> m_perf_limits;
2109 };
2110
2111
2112 //compatibility of the executable
2113 extern const CompatSet::Feature ceph_osd_feature_compat[];
2114 extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
2115 extern const CompatSet::Feature ceph_osd_feature_incompat[];
2116
2117 #endif // CEPH_OSD_H