]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/OSD.h
import quincy 17.2.0
[ceph.git] / ceph / src / osd / OSD.h
CommitLineData
f67539c2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
7c673cae
FG
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
f67539c2 10 * License version 2.1, as published by the Free Software
7c673cae 11 * Foundation. See file COPYING.
f67539c2 12 *
7c673cae
FG
13 */
14
15#ifndef CEPH_OSD_H
16#define CEPH_OSD_H
17
18#include "PG.h"
19
20#include "msg/Dispatcher.h"
21
f67539c2 22#include "common/async/context_pool.h"
7c673cae
FG
23#include "common/Timer.h"
24#include "common/WorkQueue.h"
25#include "common/AsyncReserver.h"
26#include "common/ceph_context.h"
11fdf7f2 27#include "common/config_cacher.h"
7c673cae 28#include "common/zipkin_trace.h"
9f95a23c 29#include "common/ceph_timer.h"
7c673cae
FG
30
31#include "mgr/MgrClient.h"
32
33#include "os/ObjectStore.h"
7c673cae
FG
34
35#include "include/CompatSet.h"
9f95a23c 36#include "include/common_fwd.h"
7c673cae
FG
37
38#include "OpRequest.h"
39#include "Session.h"
40
9f95a23c 41#include "osd/scheduler/OpScheduler.h"
224ce89b 42
7c673cae
FG
43#include <atomic>
44#include <map>
45#include <memory>
11fdf7f2 46#include <string>
7c673cae
FG
47
48#include "include/unordered_map.h"
49
50#include "common/shared_cache.hpp"
51#include "common/simple_cache.hpp"
7c673cae 52#include "messages/MOSDOp.h"
7c673cae 53#include "common/EventTrace.h"
9f95a23c 54#include "osd/osd_perf_counters.h"
f67539c2 55#include "common/Finisher.h"
20effc67 56#include "scrubber/osd_scrub_sched.h"
7c673cae
FG
57
58#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
59
11fdf7f2
TL
60/*
61
62 lock ordering for pg map
63
64 PG::lock
65 ShardData::lock
66 OSD::pg_map_lock
67
68 */
7c673cae 69
7c673cae
FG
70class Messenger;
71class Message;
72class MonClient;
7c673cae
FG
73class ObjectStore;
74class FuseStore;
75class OSDMap;
76class MLog;
77class Objecter;
11fdf7f2 78class KeyStore;
7c673cae
FG
79
80class Watch;
81class PrimaryLogPG;
82
7c673cae 83class TestOpsSocketHook;
11fdf7f2 84struct C_FinishSplits;
7c673cae
FG
85struct C_OpenPGs;
86class LogChannel;
7c673cae 87
11fdf7f2 88class MOSDPGCreate2;
11fdf7f2
TL
89class MOSDPGNotify;
90class MOSDPGInfo;
91class MOSDPGRemove;
92class MOSDForceRecovery;
9f95a23c 93class MMonGetPurgedSnapsReply;
7c673cae
FG
94
95class OSD;
96
7c673cae 97class OSDService {
9f95a23c 98 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
7c673cae
FG
99public:
100 OSD *osd;
101 CephContext *cct;
11fdf7f2 102 ObjectStore::CollectionHandle meta_ch;
7c673cae 103 const int whoami;
20effc67 104 ObjectStore * const store;
7c673cae
FG
105 LogClient &log_client;
106 LogChannelRef clog;
107 PGRecoveryStats &pg_recovery_stats;
108private:
109 Messenger *&cluster_messenger;
110 Messenger *&client_messenger;
111public:
112 PerfCounters *&logger;
113 PerfCounters *&recoverystate_perf;
114 MonClient *&monc;
7c673cae 115
11fdf7f2
TL
116 md_config_cacher_t<Option::size_t> osd_max_object_size;
117 md_config_cacher_t<bool> osd_skip_data_digest;
118
9f95a23c
TL
119 void enqueue_back(OpSchedulerItem&& qi);
120 void enqueue_front(OpSchedulerItem&& qi);
7c673cae
FG
121
122 void maybe_inject_dispatch_delay() {
11fdf7f2 123 if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) {
7c673cae 124 if (rand() % 10000 <
11fdf7f2 125 g_conf()->osd_debug_inject_dispatch_delay_probability * 10000) {
7c673cae 126 utime_t t;
11fdf7f2 127 t.set_from_double(g_conf()->osd_debug_inject_dispatch_delay_duration);
7c673cae
FG
128 t.sleep();
129 }
130 }
131 }
132
9f95a23c
TL
133 ceph::signedspan get_mnow();
134
7c673cae
FG
135private:
136 // -- superblock --
11fdf7f2 137 ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
7c673cae
FG
138 OSDSuperblock superblock;
139
140public:
141 OSDSuperblock get_superblock() {
11fdf7f2 142 std::lock_guard l(publish_lock);
7c673cae
FG
143 return superblock;
144 }
145 void publish_superblock(const OSDSuperblock &block) {
11fdf7f2 146 std::lock_guard l(publish_lock);
7c673cae
FG
147 superblock = block;
148 }
149
150 int get_nodeid() const { return whoami; }
151
152 std::atomic<epoch_t> max_oldest_map;
153private:
154 OSDMapRef osdmap;
155
156public:
157 OSDMapRef get_osdmap() {
11fdf7f2 158 std::lock_guard l(publish_lock);
7c673cae
FG
159 return osdmap;
160 }
161 epoch_t get_osdmap_epoch() {
11fdf7f2 162 std::lock_guard l(publish_lock);
7c673cae
FG
163 return osdmap ? osdmap->get_epoch() : 0;
164 }
165 void publish_map(OSDMapRef map) {
11fdf7f2 166 std::lock_guard l(publish_lock);
7c673cae
FG
167 osdmap = map;
168 }
169
170 /*
f67539c2
TL
171 * osdmap - current published std::map
172 * next_osdmap - pre_published std::map that is about to be published.
7c673cae
FG
173 *
174 * We use the next_osdmap to send messages and initiate connections,
f67539c2 175 * but only if the target is the same instance as the one in the std::map
7c673cae
FG
176 * epoch the current user is working from (i.e., the result is
177 * equivalent to what is in next_osdmap).
178 *
179 * This allows the helpers to start ignoring osds that are about to
180 * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
181 * down, without worrying about reopening connections from threads
182 * working from old maps.
183 */
184private:
185 OSDMapRef next_osdmap;
11fdf7f2 186 ceph::condition_variable pre_publish_cond;
9f95a23c 187 int pre_publish_waiter = 0;
7c673cae
FG
188
189public:
190 void pre_publish_map(OSDMapRef map) {
11fdf7f2 191 std::lock_guard l(pre_publish_lock);
7c673cae
FG
192 next_osdmap = std::move(map);
193 }
194
195 void activate_map();
196 /// map epochs reserved below
f67539c2 197 std::map<epoch_t, unsigned> map_reservations;
7c673cae
FG
198
199 /// gets ref to next_osdmap and registers the epoch as reserved
200 OSDMapRef get_nextmap_reserved() {
11fdf7f2 201 std::lock_guard l(pre_publish_lock);
7c673cae 202 epoch_t e = next_osdmap->get_epoch();
f67539c2
TL
203 std::map<epoch_t, unsigned>::iterator i =
204 map_reservations.insert(std::make_pair(e, 0)).first;
7c673cae
FG
205 i->second++;
206 return next_osdmap;
207 }
208 /// releases reservation on map
209 void release_map(OSDMapRef osdmap) {
11fdf7f2 210 std::lock_guard l(pre_publish_lock);
f67539c2 211 std::map<epoch_t, unsigned>::iterator i =
7c673cae 212 map_reservations.find(osdmap->get_epoch());
11fdf7f2
TL
213 ceph_assert(i != map_reservations.end());
214 ceph_assert(i->second > 0);
7c673cae
FG
215 if (--(i->second) == 0) {
216 map_reservations.erase(i);
217 }
9f95a23c
TL
218 if (pre_publish_waiter) {
219 pre_publish_cond.notify_all();
220 }
7c673cae
FG
221 }
222 /// blocks until there are no reserved maps prior to next_osdmap
223 void await_reserved_maps() {
11fdf7f2
TL
224 std::unique_lock l{pre_publish_lock};
225 ceph_assert(next_osdmap);
9f95a23c 226 pre_publish_waiter++;
11fdf7f2
TL
227 pre_publish_cond.wait(l, [this] {
228 auto i = map_reservations.cbegin();
229 return (i == map_reservations.cend() ||
230 i->first >= next_osdmap->get_epoch());
231 });
9f95a23c 232 pre_publish_waiter--;
11fdf7f2
TL
233 }
234 OSDMapRef get_next_osdmap() {
235 std::lock_guard l(pre_publish_lock);
11fdf7f2 236 return next_osdmap;
7c673cae
FG
237 }
238
9f95a23c
TL
239 void maybe_share_map(Connection *con,
240 const OSDMapRef& osdmap,
241 epoch_t peer_epoch_lb=0);
7c673cae
FG
242
243 void send_map(class MOSDMap *m, Connection *con);
9f95a23c
TL
244 void send_incremental_map(epoch_t since, Connection *con,
245 const OSDMapRef& osdmap);
7c673cae
FG
246 MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
247 OSDSuperblock& superblock);
7c673cae
FG
248
249 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
f67539c2 250 std::pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
7c673cae 251 void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
9f95a23c 252 void send_message_osd_cluster(std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch);
f67539c2
TL
253 void send_message_osd_cluster(MessageRef m, Connection *con) {
254 con->send_message2(std::move(m));
7c673cae
FG
255 }
256 void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
257 con->send_message(m);
258 }
7c673cae
FG
259 void send_message_osd_client(Message *m, const ConnectionRef& con) {
260 con->send_message(m);
261 }
11fdf7f2 262 entity_name_t get_cluster_msgr_name() const;
7c673cae 263
7c673cae
FG
264
265public:
7c673cae
FG
266
267 void reply_op_error(OpRequestRef op, int err);
9f95a23c 268 void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv,
f67539c2 269 std::vector<pg_log_op_return_item_t> op_returns);
7c673cae
FG
270 void handle_misdirected_op(PG *pg, OpRequestRef op);
271
20effc67
TL
272 private:
273 /**
274 * The entity that maintains the set of PGs we may scrub (i.e. - those that we
275 * are their primary), and schedules their scrubbing.
276 */
277 ScrubQueue m_scrub_queue;
7c673cae 278
20effc67
TL
279 public:
280 ScrubQueue& get_scrub_services() { return m_scrub_queue; }
281
282 /**
283 * A callback used by the ScrubQueue object to initiate a scrub on a specific PG.
284 *
285 * The request might fail for multiple reasons, as ScrubQueue cannot by its own
286 * check some of the PG-specific preconditions and those are checked here. See
287 * attempt_t definition.
288 *
289 * @param pgid to scrub
290 * @param allow_requested_repair_only
291 * @return a Scrub::attempt_t detailing either a success, or the failure reason.
292 */
293 Scrub::schedule_result_t initiate_a_scrub(spg_t pgid, bool allow_requested_repair_only);
294
295
296 private:
7c673cae 297 // -- agent shared state --
9f95a23c
TL
298 ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock");
299 ceph::condition_variable agent_cond;
f67539c2
TL
300 std::map<uint64_t, std::set<PGRef> > agent_queue;
301 std::set<PGRef>::iterator agent_queue_pos;
7c673cae
FG
302 bool agent_valid_iterator;
303 int agent_ops;
304 int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
f67539c2 305 std::set<hobject_t> agent_oids;
7c673cae
FG
306 bool agent_active;
307 struct AgentThread : public Thread {
308 OSDService *osd;
309 explicit AgentThread(OSDService *o) : osd(o) {}
310 void *entry() override {
311 osd->agent_entry();
312 return NULL;
313 }
314 } agent_thread;
315 bool agent_stop_flag;
9f95a23c 316 ceph::mutex agent_timer_lock = ceph::make_mutex("OSDService::agent_timer_lock");
7c673cae
FG
317 SafeTimer agent_timer;
318
319public:
320 void agent_entry();
321 void agent_stop();
322
323 void _enqueue(PG *pg, uint64_t priority) {
324 if (!agent_queue.empty() &&
325 agent_queue.rbegin()->first < priority)
326 agent_valid_iterator = false; // inserting higher-priority queue
f67539c2 327 std::set<PGRef>& nq = agent_queue[priority];
7c673cae 328 if (nq.empty())
9f95a23c 329 agent_cond.notify_all();
7c673cae
FG
330 nq.insert(pg);
331 }
332
333 void _dequeue(PG *pg, uint64_t old_priority) {
f67539c2
TL
334 std::set<PGRef>& oq = agent_queue[old_priority];
335 std::set<PGRef>::iterator p = oq.find(pg);
11fdf7f2 336 ceph_assert(p != oq.end());
7c673cae
FG
337 if (p == agent_queue_pos)
338 ++agent_queue_pos;
339 oq.erase(p);
340 if (oq.empty()) {
341 if (agent_queue.rbegin()->first == old_priority)
342 agent_valid_iterator = false;
343 agent_queue.erase(old_priority);
344 }
345 }
346
347 /// enable agent for a pg
348 void agent_enable_pg(PG *pg, uint64_t priority) {
11fdf7f2 349 std::lock_guard l(agent_lock);
7c673cae
FG
350 _enqueue(pg, priority);
351 }
352
353 /// adjust priority for an enagled pg
354 void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
11fdf7f2
TL
355 std::lock_guard l(agent_lock);
356 ceph_assert(new_priority != old_priority);
7c673cae
FG
357 _enqueue(pg, new_priority);
358 _dequeue(pg, old_priority);
359 }
360
361 /// disable agent for a pg
362 void agent_disable_pg(PG *pg, uint64_t old_priority) {
11fdf7f2 363 std::lock_guard l(agent_lock);
7c673cae
FG
364 _dequeue(pg, old_priority);
365 }
366
367 /// note start of an async (evict) op
368 void agent_start_evict_op() {
11fdf7f2 369 std::lock_guard l(agent_lock);
7c673cae
FG
370 ++agent_ops;
371 }
372
373 /// note finish or cancellation of an async (evict) op
374 void agent_finish_evict_op() {
11fdf7f2
TL
375 std::lock_guard l(agent_lock);
376 ceph_assert(agent_ops > 0);
7c673cae 377 --agent_ops;
9f95a23c 378 agent_cond.notify_all();
7c673cae
FG
379 }
380
381 /// note start of an async (flush) op
382 void agent_start_op(const hobject_t& oid) {
11fdf7f2 383 std::lock_guard l(agent_lock);
7c673cae 384 ++agent_ops;
11fdf7f2 385 ceph_assert(agent_oids.count(oid) == 0);
7c673cae
FG
386 agent_oids.insert(oid);
387 }
388
389 /// note finish or cancellation of an async (flush) op
390 void agent_finish_op(const hobject_t& oid) {
11fdf7f2
TL
391 std::lock_guard l(agent_lock);
392 ceph_assert(agent_ops > 0);
7c673cae 393 --agent_ops;
11fdf7f2 394 ceph_assert(agent_oids.count(oid) == 1);
7c673cae 395 agent_oids.erase(oid);
9f95a23c 396 agent_cond.notify_all();
7c673cae
FG
397 }
398
399 /// check if we are operating on an object
400 bool agent_is_active_oid(const hobject_t& oid) {
11fdf7f2 401 std::lock_guard l(agent_lock);
7c673cae
FG
402 return agent_oids.count(oid);
403 }
404
405 /// get count of active agent ops
406 int agent_get_num_ops() {
11fdf7f2 407 std::lock_guard l(agent_lock);
7c673cae
FG
408 return agent_ops;
409 }
410
411 void agent_inc_high_count() {
11fdf7f2 412 std::lock_guard l(agent_lock);
7c673cae
FG
413 flush_mode_high_count ++;
414 }
415
416 void agent_dec_high_count() {
11fdf7f2 417 std::lock_guard l(agent_lock);
7c673cae
FG
418 flush_mode_high_count --;
419 }
420
421private:
422 /// throttle promotion attempts
11fdf7f2 423 std::atomic<unsigned int> promote_probability_millis{1000}; ///< probability thousands. one word.
7c673cae
FG
424 PromoteCounter promote_counter;
425 utime_t last_recalibrate;
426 unsigned long promote_max_objects, promote_max_bytes;
427
428public:
429 bool promote_throttle() {
430 // NOTE: lockless! we rely on the probability being a single word.
431 promote_counter.attempt();
432 if ((unsigned)rand() % 1000 > promote_probability_millis)
433 return true; // yes throttle (no promote)
434 if (promote_max_objects &&
435 promote_counter.objects > promote_max_objects)
436 return true; // yes throttle
437 if (promote_max_bytes &&
438 promote_counter.bytes > promote_max_bytes)
439 return true; // yes throttle
440 return false; // no throttle (promote)
441 }
442 void promote_finish(uint64_t bytes) {
443 promote_counter.finish(bytes);
444 }
445 void promote_throttle_recalibrate();
9f95a23c
TL
446 unsigned get_num_shards() const {
447 return m_objecter_finishers;
448 }
449 Finisher* get_objecter_finisher(int shard) {
450 return objecter_finishers[shard].get();
451 }
7c673cae
FG
452
453 // -- Objecter, for tiering reads/writes from/to other OSDs --
f67539c2 454 ceph::async::io_context_pool& poolctx;
9f95a23c 455 std::unique_ptr<Objecter> objecter;
11fdf7f2 456 int m_objecter_finishers;
9f95a23c 457 std::vector<std::unique_ptr<Finisher>> objecter_finishers;
7c673cae
FG
458
459 // -- Watch --
9f95a23c 460 ceph::mutex watch_lock = ceph::make_mutex("OSDService::watch_lock");
7c673cae
FG
461 SafeTimer watch_timer;
462 uint64_t next_notif_id;
463 uint64_t get_next_id(epoch_t cur_epoch) {
11fdf7f2 464 std::lock_guard l(watch_lock);
7c673cae
FG
465 return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
466 }
467
468 // -- Recovery/Backfill Request Scheduling --
9f95a23c 469 ceph::mutex recovery_request_lock = ceph::make_mutex("OSDService::recovery_request_lock");
7c673cae
FG
470 SafeTimer recovery_request_timer;
471
31f18b77
FG
472 // For async recovery sleep
473 bool recovery_needs_sleep = true;
9f95a23c 474 ceph::real_clock::time_point recovery_schedule_time;
31f18b77 475
11fdf7f2 476 // For recovery & scrub & snap
9f95a23c 477 ceph::mutex sleep_lock = ceph::make_mutex("OSDService::sleep_lock");
11fdf7f2 478 SafeTimer sleep_timer;
31f18b77 479
7c673cae
FG
480 // -- tids --
481 // for ops i issue
11fdf7f2 482 std::atomic<unsigned int> last_tid{0};
7c673cae
FG
483 ceph_tid_t get_tid() {
484 return (ceph_tid_t)last_tid++;
485 }
486
487 // -- backfill_reservation --
488 Finisher reserver_finisher;
f67539c2
TL
489 AsyncReserver<spg_t, Finisher> local_reserver;
490 AsyncReserver<spg_t, Finisher> remote_reserver;
7c673cae 491
11fdf7f2 492 // -- pg merge --
9f95a23c 493 ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
f67539c2
TL
494 std::map<pg_t,eversion_t> ready_to_merge_source; // pg -> version
495 std::map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target; // pg -> (version,les,lec)
496 std::set<pg_t> not_ready_to_merge_source;
497 std::map<pg_t,pg_t> not_ready_to_merge_target;
498 std::set<pg_t> sent_ready_to_merge_source;
11fdf7f2
TL
499
500 void set_ready_to_merge_source(PG *pg,
501 eversion_t version);
502 void set_ready_to_merge_target(PG *pg,
503 eversion_t version,
504 epoch_t last_epoch_started,
505 epoch_t last_epoch_clean);
506 void set_not_ready_to_merge_source(pg_t source);
507 void set_not_ready_to_merge_target(pg_t target, pg_t source);
508 void clear_ready_to_merge(PG *pg);
509 void send_ready_to_merge();
510 void _send_ready_to_merge();
511 void clear_sent_ready_to_merge();
9f95a23c 512 void prune_sent_ready_to_merge(const OSDMapRef& osdmap);
11fdf7f2 513
7c673cae
FG
514 // -- pg_temp --
515private:
9f95a23c 516 ceph::mutex pg_temp_lock = ceph::make_mutex("OSDService::pg_temp_lock");
94b18763 517 struct pg_temp_t {
f67539c2 518 std::vector<int> acting;
94b18763
FG
519 bool forced = false;
520 };
f67539c2
TL
521 std::map<pg_t, pg_temp_t> pg_temp_wanted;
522 std::map<pg_t, pg_temp_t> pg_temp_pending;
7c673cae 523 void _sent_pg_temp();
94b18763 524 friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
7c673cae 525public:
f67539c2 526 void queue_want_pg_temp(pg_t pgid, const std::vector<int>& want,
94b18763 527 bool forced = false);
7c673cae
FG
528 void remove_want_pg_temp(pg_t pgid);
529 void requeue_pg_temp();
530 void send_pg_temp();
531
11fdf7f2 532 ceph::mutex pg_created_lock = ceph::make_mutex("OSDService::pg_created_lock");
f67539c2 533 std::set<pg_t> pg_created;
7c673cae 534 void send_pg_created(pg_t pgid);
11fdf7f2
TL
535 void prune_pg_created();
536 void send_pg_created();
31f18b77 537
f67539c2 538 AsyncReserver<spg_t, Finisher> snap_reserver;
11fdf7f2 539 void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c);
7c673cae 540 void queue_for_snap_trim(PG *pg);
f67539c2 541 void queue_for_scrub(PG* pg, Scrub::scrub_prio_t with_priority);
20effc67 542
f67539c2
TL
543 void queue_scrub_after_repair(PG* pg, Scrub::scrub_prio_t with_priority);
544
20effc67 545 /// queue the message (-> event) that all replicas have reserved scrub resources for us
f67539c2
TL
546 void queue_for_scrub_granted(PG* pg, Scrub::scrub_prio_t with_priority);
547
548 /// queue the message (-> event) that some replicas denied our scrub resources request
549 void queue_for_scrub_denied(PG* pg, Scrub::scrub_prio_t with_priority);
550
551 /// Signals either (a) the end of a sleep period, or (b) a recheck of the availability
552 /// of the primary map being created by the backend.
553 void queue_for_scrub_resched(PG* pg, Scrub::scrub_prio_t with_priority);
554
555 /// Signals a change in the number of in-flight recovery writes
556 void queue_scrub_pushes_update(PG* pg, Scrub::scrub_prio_t with_priority);
557
558 /// Signals that all pending updates were applied
559 void queue_scrub_applied_update(PG* pg, Scrub::scrub_prio_t with_priority);
560
20effc67
TL
561 /// Signals that the selected chunk (objects range) is available for scrubbing
562 void queue_scrub_chunk_free(PG* pg, Scrub::scrub_prio_t with_priority);
563
564 /// The chunk selected is blocked by user operations, and cannot be scrubbed now
565 void queue_scrub_chunk_busy(PG* pg, Scrub::scrub_prio_t with_priority);
566
f67539c2
TL
567 /// The block-range that was locked and prevented the scrubbing - is freed
568 void queue_scrub_unblocking(PG* pg, Scrub::scrub_prio_t with_priority);
569
570 /// Signals that all write OPs are done
571 void queue_scrub_digest_update(PG* pg, Scrub::scrub_prio_t with_priority);
572
20effc67
TL
573 /// Signals that the the local (Primary's) scrub map is ready
574 void queue_scrub_got_local_map(PG* pg, Scrub::scrub_prio_t with_priority);
575
f67539c2
TL
576 /// Signals that we (the Primary) got all waited-for scrub-maps from our replicas
577 void queue_scrub_got_repl_maps(PG* pg, Scrub::scrub_prio_t with_priority);
578
20effc67
TL
579 /// Signals that all chunks were handled
580 /// Note: always with high priority, as must be acted upon before the
581 /// next scrub request arrives from the Primary (and the primary is free
582 /// to send the request once the replica's map is received).
583 void queue_scrub_is_finished(PG* pg);
584
585 /// Signals that there are more chunks to handle
586 void queue_scrub_next_chunk(PG* pg, Scrub::scrub_prio_t with_priority);
587
588 /// Signals that we have finished comparing the maps for this chunk
589 /// Note: required, as in Crimson this operation is 'futurized'.
590 void queue_scrub_maps_compared(PG* pg, Scrub::scrub_prio_t with_priority);
591
f67539c2
TL
592 void queue_for_rep_scrub(PG* pg,
593 Scrub::scrub_prio_t with_high_priority,
20effc67
TL
594 unsigned int qu_priority,
595 Scrub::act_token_t act_token);
f67539c2
TL
596
597 /// Signals a change in the number of in-flight recovery writes
598 void queue_scrub_replica_pushes(PG *pg, Scrub::scrub_prio_t with_priority);
599
20effc67
TL
600 /// (not in Crimson) Queue a SchedReplica event to be sent to the replica, to
601 /// trigger a re-check of the availability of the scrub map prepared by the
602 /// backend.
f67539c2
TL
603 void queue_for_rep_scrub_resched(PG* pg,
604 Scrub::scrub_prio_t with_high_priority,
20effc67
TL
605 unsigned int qu_priority,
606 Scrub::act_token_t act_token);
f67539c2 607
11fdf7f2
TL
608 void queue_for_pg_delete(spg_t pgid, epoch_t e);
609 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
7c673cae
FG
610
611private:
612 // -- pg recovery and associated throttling --
9f95a23c 613 ceph::mutex recovery_lock = ceph::make_mutex("OSDService::recovery_lock");
f67539c2
TL
614 std::list<std::pair<epoch_t, PGRef> > awaiting_throttle;
615
616 /// queue a scrub-related message for a PG
617 template <class MSG_TYPE>
618 void queue_scrub_event_msg(PG* pg,
619 Scrub::scrub_prio_t with_priority,
20effc67
TL
620 unsigned int qu_priority,
621 Scrub::act_token_t act_token);
f67539c2
TL
622
623 /// An alternative version of queue_scrub_event_msg(), in which the queuing priority is
624 /// provided by the executing scrub (i.e. taken from PgScrubber::m_flags)
625 template <class MSG_TYPE>
626 void queue_scrub_event_msg(PG* pg, Scrub::scrub_prio_t with_priority);
7c673cae
FG
627
628 utime_t defer_recovery_until;
629 uint64_t recovery_ops_active;
630 uint64_t recovery_ops_reserved;
631 bool recovery_paused;
632#ifdef DEBUG_RECOVERY_OIDS
f67539c2 633 std::map<spg_t, std::set<hobject_t> > recovery_oids;
7c673cae
FG
634#endif
635 bool _recover_now(uint64_t *available_pushes);
636 void _maybe_queue_recovery();
637 void _queue_for_recovery(
f67539c2 638 std::pair<epoch_t, PGRef> p, uint64_t reserved_pushes);
7c673cae
FG
639public:
640 void start_recovery_op(PG *pg, const hobject_t& soid);
641 void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
642 bool is_recovery_active();
11fdf7f2 643 void release_reserved_pushes(uint64_t pushes);
7c673cae
FG
644 void defer_recovery(float defer_for) {
645 defer_recovery_until = ceph_clock_now();
646 defer_recovery_until += defer_for;
647 }
648 void pause_recovery() {
11fdf7f2 649 std::lock_guard l(recovery_lock);
7c673cae
FG
650 recovery_paused = true;
651 }
652 bool recovery_is_paused() {
11fdf7f2 653 std::lock_guard l(recovery_lock);
7c673cae
FG
654 return recovery_paused;
655 }
656 void unpause_recovery() {
11fdf7f2 657 std::lock_guard l(recovery_lock);
7c673cae
FG
658 recovery_paused = false;
659 _maybe_queue_recovery();
660 }
661 void kick_recovery_queue() {
11fdf7f2 662 std::lock_guard l(recovery_lock);
7c673cae
FG
663 _maybe_queue_recovery();
664 }
665 void clear_queued_recovery(PG *pg) {
11fdf7f2
TL
666 std::lock_guard l(recovery_lock);
667 awaiting_throttle.remove_if(
668 [pg](decltype(awaiting_throttle)::const_reference awaiting ) {
669 return awaiting.second.get() == pg;
670 });
7c673cae 671 }
9f95a23c
TL
672
673 unsigned get_target_pg_log_entries() const;
f67539c2 674
7c673cae 675 // delayed pg activation
c07f9fc5 676 void queue_for_recovery(PG *pg) {
11fdf7f2 677 std::lock_guard l(recovery_lock);
c07f9fc5 678
11fdf7f2 679 if (pg->is_forced_recovery_or_backfill()) {
f67539c2 680 awaiting_throttle.push_front(std::make_pair(pg->get_osdmap()->get_epoch(), pg));
7c673cae 681 } else {
f67539c2 682 awaiting_throttle.push_back(std::make_pair(pg->get_osdmap()->get_epoch(), pg));
7c673cae
FG
683 }
684 _maybe_queue_recovery();
685 }
31f18b77 686 void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
11fdf7f2 687 std::lock_guard l(recovery_lock);
f67539c2 688 _queue_for_recovery(std::make_pair(queued, pg), reserved_pushes);
31f18b77 689 }
7c673cae 690
9f95a23c
TL
691 void queue_check_readable(spg_t spgid,
692 epoch_t lpr,
693 ceph::signedspan delay = ceph::signedspan::zero());
694
7c673cae 695 // osd map cache (past osd maps)
9f95a23c 696 ceph::mutex map_cache_lock = ceph::make_mutex("OSDService::map_cache_lock");
7c673cae 697 SharedLRU<epoch_t, const OSDMap> map_cache;
f67539c2
TL
698 SimpleLRU<epoch_t, ceph::buffer::list> map_bl_cache;
699 SimpleLRU<epoch_t, ceph::buffer::list> map_bl_inc_cache;
7c673cae
FG
700
701 OSDMapRef try_get_map(epoch_t e);
702 OSDMapRef get_map(epoch_t e) {
703 OSDMapRef ret(try_get_map(e));
11fdf7f2 704 ceph_assert(ret);
7c673cae
FG
705 return ret;
706 }
707 OSDMapRef add_map(OSDMap *o) {
11fdf7f2 708 std::lock_guard l(map_cache_lock);
7c673cae
FG
709 return _add_map(o);
710 }
711 OSDMapRef _add_map(OSDMap *o);
712
f67539c2
TL
713 void _add_map_bl(epoch_t e, ceph::buffer::list& bl);
714 bool get_map_bl(epoch_t e, ceph::buffer::list& bl) {
11fdf7f2 715 std::lock_guard l(map_cache_lock);
7c673cae
FG
716 return _get_map_bl(e, bl);
717 }
f67539c2 718 bool _get_map_bl(epoch_t e, ceph::buffer::list& bl);
7c673cae 719
f67539c2
TL
720 void _add_map_inc_bl(epoch_t e, ceph::buffer::list& bl);
721 bool get_inc_map_bl(epoch_t e, ceph::buffer::list& bl);
7c673cae 722
11fdf7f2
TL
723 /// identify split child pgids over a osdmap interval
724 void identify_splits_and_merges(
725 OSDMapRef old_map,
726 OSDMapRef new_map,
727 spg_t pgid,
f67539c2
TL
728 std::set<std::pair<spg_t,epoch_t>> *new_children,
729 std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
11fdf7f2
TL
730
731 void need_heartbeat_peer_update();
7c673cae
FG
732
733 void init();
f67539c2 734 void final_init();
7c673cae 735 void start_shutdown();
31f18b77 736 void shutdown_reserver();
7c673cae
FG
737 void shutdown();
738
7c673cae 739 // -- stats --
9f95a23c 740 ceph::mutex stat_lock = ceph::make_mutex("OSDService::stat_lock");
7c673cae 741 osd_stat_t osd_stat;
31f18b77 742 uint32_t seq = 0;
7c673cae 743
11fdf7f2
TL
744 void set_statfs(const struct store_statfs_t &stbuf,
745 osd_alert_list_t& alerts);
f67539c2 746 osd_stat_t set_osd_stat(std::vector<int>& hb_peers, int num_pgs);
11fdf7f2
TL
747 void inc_osd_stat_repaired(void);
748 float compute_adjusted_ratio(osd_stat_t new_stat, float *pratio, uint64_t adjust_used = 0);
7c673cae 749 osd_stat_t get_osd_stat() {
11fdf7f2 750 std::lock_guard l(stat_lock);
31f18b77
FG
751 ++seq;
752 osd_stat.up_from = up_epoch;
753 osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
7c673cae
FG
754 return osd_stat;
755 }
31f18b77 756 uint64_t get_osd_stat_seq() {
11fdf7f2 757 std::lock_guard l(stat_lock);
31f18b77
FG
758 return osd_stat.seq;
759 }
f67539c2 760 void get_hb_pingtime(std::map<int, osd_stat_t::Interfaces> *pp)
eafe8130
TL
761 {
762 std::lock_guard l(stat_lock);
763 *pp = osd_stat.hb_pingtime;
764 return;
765 }
7c673cae
FG
766
767 // -- OSD Full Status --
768private:
769 friend TestOpsSocketHook;
9f95a23c 770 mutable ceph::mutex full_status_lock = ceph::make_mutex("OSDService::full_status_lock");
7c673cae
FG
771 enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
772 const char *get_full_state_name(s_names s) const {
773 switch (s) {
774 case NONE: return "none";
775 case NEARFULL: return "nearfull";
776 case BACKFILLFULL: return "backfillfull";
777 case FULL: return "full";
778 case FAILSAFE: return "failsafe";
779 default: return "???";
780 }
781 }
f67539c2 782 s_names get_full_state(std::string type) const {
7c673cae
FG
783 if (type == "none")
784 return NONE;
785 else if (type == "failsafe")
786 return FAILSAFE;
787 else if (type == "full")
788 return FULL;
789 else if (type == "backfillfull")
790 return BACKFILLFULL;
791 else if (type == "nearfull")
792 return NEARFULL;
793 else
794 return INVALID;
795 }
11fdf7f2 796 double cur_ratio, physical_ratio; ///< current utilization
7c673cae
FG
797 mutable int64_t injectfull = 0;
798 s_names injectfull_state = NONE;
799 float get_failsafe_full_ratio();
11fdf7f2
TL
800 bool _check_inject_full(DoutPrefixProvider *dpp, s_names type) const;
801 bool _check_full(DoutPrefixProvider *dpp, s_names type) const;
7c673cae 802public:
11fdf7f2 803 void check_full_status(float ratio, float pratio);
f67539c2 804 s_names recalc_full_state(float ratio, float pratio, std::string &inject);
11fdf7f2
TL
805 bool _tentative_full(DoutPrefixProvider *dpp, s_names type, uint64_t adjust_used, osd_stat_t);
806 bool check_failsafe_full(DoutPrefixProvider *dpp) const;
807 bool check_full(DoutPrefixProvider *dpp) const;
808 bool tentative_backfill_full(DoutPrefixProvider *dpp, uint64_t adjust_used, osd_stat_t);
809 bool check_backfill_full(DoutPrefixProvider *dpp) const;
810 bool check_nearfull(DoutPrefixProvider *dpp) const;
7c673cae
FG
811 bool is_failsafe_full() const;
812 bool is_full() const;
813 bool is_backfillfull() const;
814 bool is_nearfull() const;
815 bool need_fullness_update(); ///< osdmap state needs update
816 void set_injectfull(s_names type, int64_t count);
7c673cae
FG
817
818
819 // -- epochs --
820private:
9f95a23c
TL
821 // protects access to boot_epoch, up_epoch, bind_epoch
822 mutable ceph::mutex epoch_lock = ceph::make_mutex("OSDService::epoch_lock");
7c673cae
FG
823 epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
824 epoch_t up_epoch; // _most_recent_ epoch we were marked up
825 epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
826public:
827 /**
f67539c2 828 * Retrieve the boot_, up_, and bind_ epochs the OSD has std::set. The params
7c673cae
FG
829 * can be NULL if you don't care about them.
830 */
831 void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
832 epoch_t *_bind_epoch) const;
833 /**
f67539c2 834 * Std::set the boot, up, and bind epochs. Any NULL params will not be std::set.
7c673cae
FG
835 */
836 void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
837 const epoch_t *_bind_epoch);
838 epoch_t get_boot_epoch() const {
839 epoch_t ret;
840 retrieve_epochs(&ret, NULL, NULL);
841 return ret;
842 }
843 epoch_t get_up_epoch() const {
844 epoch_t ret;
845 retrieve_epochs(NULL, &ret, NULL);
846 return ret;
847 }
848 epoch_t get_bind_epoch() const {
849 epoch_t ret;
850 retrieve_epochs(NULL, NULL, &ret);
851 return ret;
852 }
853
181888fb
FG
854 void request_osdmap_update(epoch_t e);
855
9f95a23c
TL
856 // -- heartbeats --
857 ceph::mutex hb_stamp_lock = ceph::make_mutex("OSDServce::hb_stamp_lock");
858
859 /// osd -> heartbeat stamps
f67539c2 860 std::vector<HeartbeatStampsRef> hb_stamps;
9f95a23c
TL
861
862 /// get or create a ref for a peer's HeartbeatStamps
863 HeartbeatStampsRef get_hb_stamps(unsigned osd);
864
865
866 // Timer for readable leases
867 ceph::timer<ceph::mono_clock> mono_timer = ceph::timer<ceph::mono_clock>{ceph::construct_suspended};
868
869 void queue_renew_lease(epoch_t epoch, spg_t spgid);
870
7c673cae 871 // -- stopping --
9f95a23c
TL
872 ceph::mutex is_stopping_lock = ceph::make_mutex("OSDService::is_stopping_lock");
873 ceph::condition_variable is_stopping_cond;
7c673cae
FG
874 enum {
875 NOT_STOPPING,
876 PREPARING_TO_STOP,
877 STOPPING };
11fdf7f2
TL
878 std::atomic<int> state{NOT_STOPPING};
879 int get_state() const {
7c673cae
FG
880 return state;
881 }
882 void set_state(int s) {
883 state = s;
884 }
885 bool is_stopping() const {
886 return state == STOPPING;
887 }
888 bool is_preparing_to_stop() const {
889 return state == PREPARING_TO_STOP;
890 }
891 bool prepare_to_stop();
892 void got_stop_ack();
893
894
895#ifdef PG_DEBUG_REFS
9f95a23c 896 ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock");
f67539c2
TL
897 std::map<spg_t, int> pgid_tracker;
898 std::map<spg_t, PG*> live_pgs;
31f18b77
FG
899 void add_pgid(spg_t pgid, PG *pg);
900 void remove_pgid(spg_t pgid, PG *pg);
901 void dump_live_pgids();
7c673cae
FG
902#endif
903
f67539c2 904 explicit OSDService(OSD *osd, ceph::async::io_context_pool& poolctx);
9f95a23c 905 ~OSDService() = default;
7c673cae
FG
906};
907
11fdf7f2
TL
908/*
909
910 Each PG slot includes queues for events that are processing and/or waiting
911 for a PG to be materialized in the slot.
912
913 These are the constraints:
914
f67539c2 915 - client ops must remained ordered by client, regardless of std::map epoch
11fdf7f2
TL
916 - peering messages/events from peers must remain ordered by peer
917 - peering messages and client ops need not be ordered relative to each other
918
919 - some peering events can create a pg (e.g., notify)
920 - the query peering event can proceed when a PG doesn't exist
921
922 Implementation notes:
923
924 - everybody waits for split. If the OSD has the parent PG it will instantiate
925 the PGSlot early and mark it waiting_for_split. Everything will wait until
926 the parent is able to commit the split operation and the child PG's are
927 materialized in the child slots.
928
929 - every event has an epoch property and will wait for the OSDShard to catch
930 up to that epoch. For example, if we get a peering event from a future
931 epoch, the event will wait in the slot until the local OSD has caught up.
932 (We should be judicious in specifying the required epoch [by, e.g., setting
933 it to the same_interval_since epoch] so that we don't wait for epochs that
934 don't affect the given PG.)
935
936 - we maintain two separate wait lists, *waiting* and *waiting_peering*. The
9f95a23c 937 OpSchedulerItem has an is_peering() bool to determine which we use. Waiting
11fdf7f2
TL
938 peering events are queued up by epoch required.
939
940 - when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or
941 materialized the PG), we wake *all* waiting items. (This could be optimized,
942 probably, but we don't bother.) We always requeue peering items ahead of
943 client ops.
944
945 - some peering events are marked !peering_requires_pg (PGQuery). if we do
946 not have a PG these are processed immediately (under the shard lock).
947
948 - we do not have a PG present, we check if the slot maps to the current host.
949 if so, we either queue the item and wait for the PG to materialize, or
950 (if the event is a pg creating event like PGNotify), we materialize the PG.
951
952 - when we advance the osdmap on the OSDShard, we scan pg slots and
953 discard any slots with no pg (and not waiting_for_split) that no
f67539c2 954 longer std::map to the current host.
11fdf7f2
TL
955
956 */
957
958struct OSDShardPGSlot {
9f95a23c 959 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
11fdf7f2 960 PGRef pg; ///< pg reference
f67539c2 961 std::deque<OpSchedulerItem> to_process; ///< order items for this slot
11fdf7f2
TL
962 int num_running = 0; ///< _process threads doing pg lookup/lock
963
f67539c2 964 std::deque<OpSchedulerItem> waiting; ///< waiting for pg (or map + pg)
11fdf7f2
TL
965
966 /// waiting for map (peering evt)
f67539c2 967 std::map<epoch_t,std::deque<OpSchedulerItem>> waiting_peering;
11fdf7f2
TL
968
969 /// incremented by wake_pg_waiters; indicates racing _process threads
970 /// should bail out (their op has been requeued)
971 uint64_t requeue_seq = 0;
972
973 /// waiting for split child to materialize in these epoch(s)
f67539c2 974 std::set<epoch_t> waiting_for_split;
11fdf7f2
TL
975
976 epoch_t epoch = 0;
977 boost::intrusive::set_member_hook<> pg_epoch_item;
978
979 /// waiting for a merge (source or target) by this epoch
980 epoch_t waiting_for_merge_epoch = 0;
981};
982
983struct OSDShard {
984 const unsigned shard_id;
985 CephContext *cct;
986 OSD *osd;
987
f67539c2 988 std::string shard_name;
11fdf7f2 989
f67539c2 990 std::string sdata_wait_lock_name;
11fdf7f2
TL
991 ceph::mutex sdata_wait_lock;
992 ceph::condition_variable sdata_cond;
f67539c2 993 int waiting_threads = 0;
11fdf7f2 994
11fdf7f2
TL
995 ceph::mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock
996 OSDMapRef shard_osdmap;
997
998 OSDMapRef get_osdmap() {
999 std::lock_guard l(osdmap_lock);
1000 return shard_osdmap;
1001 }
1002
f67539c2 1003 std::string shard_lock_name;
11fdf7f2
TL
1004 ceph::mutex shard_lock; ///< protects remaining members below
1005
1006 /// map of slots for each spg_t. maintains ordering of items dequeued
9f95a23c 1007 /// from scheduler while _process thread drops shard lock to acquire the
11fdf7f2 1008 /// pg lock. stale slots are removed by consume_map.
f67539c2 1009 std::unordered_map<spg_t,std::unique_ptr<OSDShardPGSlot>> pg_slots;
11fdf7f2
TL
1010
1011 struct pg_slot_compare_by_epoch {
1012 bool operator()(const OSDShardPGSlot& l, const OSDShardPGSlot& r) const {
1013 return l.epoch < r.epoch;
1014 }
1015 };
1016
1017 /// maintain an ordering of pg slots by pg epoch
1018 boost::intrusive::multiset<
1019 OSDShardPGSlot,
1020 boost::intrusive::member_hook<
1021 OSDShardPGSlot,
1022 boost::intrusive::set_member_hook<>,
1023 &OSDShardPGSlot::pg_epoch_item>,
1024 boost::intrusive::compare<pg_slot_compare_by_epoch>> pg_slots_by_epoch;
1025 int waiting_for_min_pg_epoch = 0;
1026 ceph::condition_variable min_pg_epoch_cond;
1027
1028 /// priority queue
9f95a23c 1029 ceph::osd::scheduler::OpSchedulerRef scheduler;
11fdf7f2
TL
1030
1031 bool stop_waiting = false;
1032
1033 ContextQueue context_queue;
1034
11fdf7f2
TL
1035 void _attach_pg(OSDShardPGSlot *slot, PG *pg);
1036 void _detach_pg(OSDShardPGSlot *slot);
1037
1038 void update_pg_epoch(OSDShardPGSlot *slot, epoch_t epoch);
1039 epoch_t get_min_pg_epoch();
1040 void wait_min_pg_epoch(epoch_t need);
1041
1042 /// return newest epoch we are waiting for
1043 epoch_t get_max_waiting_epoch();
1044
1045 /// push osdmap into shard
1046 void consume_map(
9f95a23c 1047 const OSDMapRef& osdmap,
11fdf7f2
TL
1048 unsigned *pushes_to_free);
1049
20effc67 1050 int _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
11fdf7f2
TL
1051
1052 void identify_splits_and_merges(
1053 const OSDMapRef& as_of_osdmap,
f67539c2
TL
1054 std::set<std::pair<spg_t,epoch_t>> *split_children,
1055 std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
1056 void _prime_splits(std::set<std::pair<spg_t,epoch_t>> *pgids);
11fdf7f2 1057 void prime_splits(const OSDMapRef& as_of_osdmap,
f67539c2 1058 std::set<std::pair<spg_t,epoch_t>> *pgids);
11fdf7f2 1059 void prime_merges(const OSDMapRef& as_of_osdmap,
f67539c2 1060 std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
11fdf7f2
TL
1061 void register_and_wake_split_child(PG *pg);
1062 void unprime_split_children(spg_t parent, unsigned old_pg_num);
a4b75251 1063 void update_scheduler_config();
20effc67 1064 std::string get_scheduler_type();
11fdf7f2
TL
1065
1066 OSDShard(
1067 int id,
1068 CephContext *cct,
9f95a23c 1069 OSD *osd);
11fdf7f2
TL
1070};
1071
7c673cae
FG
1072class OSD : public Dispatcher,
1073 public md_config_obs_t {
9f95a23c
TL
1074 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
1075
7c673cae 1076 /** OSD **/
9f95a23c
TL
1077 // global lock
1078 ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock");
7c673cae
FG
1079 SafeTimer tick_timer; // safe timer (osd_lock)
1080
1081 // Tick timer for those stuff that do not need osd_lock
9f95a23c 1082 ceph::mutex tick_timer_lock = ceph::make_mutex("OSD::tick_timer_lock");
7c673cae 1083 SafeTimer tick_timer_without_osd_lock;
11fdf7f2
TL
1084 std::string gss_ktfile_client{};
1085
7c673cae
FG
1086public:
1087 // config observer bits
1088 const char** get_tracked_conf_keys() const override;
11fdf7f2 1089 void handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
1090 const std::set <std::string> &changed) override;
1091 void update_log_config();
1092 void check_config();
1093
1094protected:
1095
91327a77
AA
1096 const double OSD_TICK_INTERVAL = { 1.0 };
1097 double get_tick_interval() const;
7c673cae 1098
7c673cae
FG
1099 Messenger *cluster_messenger;
1100 Messenger *client_messenger;
1101 Messenger *objecter_messenger;
1102 MonClient *monc; // check the "monc helpers" list before accessing directly
1103 MgrClient mgrc;
1104 PerfCounters *logger;
1105 PerfCounters *recoverystate_perf;
20effc67 1106 std::unique_ptr<ObjectStore> store;
7c673cae
FG
1107#ifdef HAVE_LIBFUSE
1108 FuseStore *fuse_store = nullptr;
1109#endif
1110 LogClient log_client;
1111 LogChannelRef clog;
1112
1113 int whoami;
1114 std::string dev_path, journal_path;
1115
9f95a23c 1116 ceph_release_t last_require_osd_release{ceph_release_t::unknown};
11fdf7f2
TL
1117
1118 int numa_node = -1;
1119 size_t numa_cpu_set_size = 0;
1120 cpu_set_t numa_cpu_set;
1121
31f18b77 1122 bool store_is_rotational = true;
d2e6a577 1123 bool journal_is_rotational = true;
31f18b77 1124
7c673cae 1125 ZTracer::Endpoint trace_endpoint;
f67539c2
TL
1126 PerfCounters* create_logger();
1127 PerfCounters* create_recoverystate_perf();
7c673cae
FG
1128 void tick();
1129 void tick_without_osd_lock();
1130 void _dispatch(Message *m);
1131 void dispatch_op(OpRequestRef op);
1132
11fdf7f2 1133 void check_osdmap_features();
7c673cae
FG
1134
1135 // asok
1136 friend class OSDSocketHook;
1137 class OSDSocketHook *asok_hook;
20effc67
TL
1138 using PGRefOrError = std::tuple<std::optional<PGRef>, int>;
1139 PGRefOrError locate_asok_target(const cmdmap_t& cmdmap,
1140 std::stringstream& ss, bool only_primary);
1141 int asok_route_to_pg(bool only_primary,
1142 std::string_view prefix,
1143 cmdmap_t cmdmap,
1144 Formatter *f,
1145 std::stringstream& ss,
1146 const bufferlist& inbl,
1147 bufferlist& outbl,
1148 std::function<void(int, const std::string&, bufferlist&)> on_finish);
9f95a23c
TL
1149 void asok_command(
1150 std::string_view prefix,
1151 const cmdmap_t& cmdmap,
f67539c2
TL
1152 ceph::Formatter *f,
1153 const ceph::buffer::list& inbl,
1154 std::function<void(int,const std::string&,ceph::buffer::list&)> on_finish);
7c673cae
FG
1155
1156public:
7c673cae 1157 int get_nodeid() { return whoami; }
f67539c2 1158
7c673cae
FG
1159 static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
1160 char foo[20];
1161 snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
1162 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1163 }
1164 static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
1165 char foo[22];
1166 snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch);
1167 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1168 }
1169
1170 static ghobject_t make_snapmapper_oid() {
1171 return ghobject_t(hobject_t(
1172 sobject_t(
1173 object_t("snapmapper"),
1174 0)));
1175 }
9f95a23c
TL
1176 static ghobject_t make_purged_snaps_oid() {
1177 return ghobject_t(hobject_t(
1178 sobject_t(
1179 object_t("purged_snaps"),
1180 0)));
1181 }
7c673cae
FG
1182
1183 static ghobject_t make_pg_log_oid(spg_t pg) {
f67539c2 1184 std::stringstream ss;
7c673cae 1185 ss << "pglog_" << pg;
f67539c2 1186 std::string s;
7c673cae
FG
1187 getline(ss, s);
1188 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1189 }
f67539c2 1190
7c673cae 1191 static ghobject_t make_pg_biginfo_oid(spg_t pg) {
f67539c2 1192 std::stringstream ss;
7c673cae 1193 ss << "pginfo_" << pg;
f67539c2 1194 std::string s;
7c673cae
FG
1195 getline(ss, s);
1196 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1197 }
1198 static ghobject_t make_infos_oid() {
1199 hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
1200 return ghobject_t(oid);
1201 }
11fdf7f2
TL
1202
1203 static ghobject_t make_final_pool_info_oid(int64_t pool) {
1204 return ghobject_t(
1205 hobject_t(
1206 sobject_t(
f67539c2 1207 object_t(std::string("final_pool_") + stringify(pool)),
11fdf7f2
TL
1208 CEPH_NOSNAP)));
1209 }
1210
1211 static ghobject_t make_pg_num_history_oid() {
1212 return ghobject_t(hobject_t(sobject_t("pg_num_history", CEPH_NOSNAP)));
1213 }
1214
7c673cae
FG
1215 static void recursive_remove_collection(CephContext* cct,
1216 ObjectStore *store,
1217 spg_t pgid,
1218 coll_t tmp);
1219
1220 /**
1221 * get_osd_initial_compat_set()
1222 *
f67539c2 1223 * Get the initial feature std::set for this OSD. Features
7c673cae
FG
1224 * here are automatically upgraded.
1225 *
1226 * Return value: Initial osd CompatSet
1227 */
1228 static CompatSet get_osd_initial_compat_set();
1229
1230 /**
1231 * get_osd_compat_set()
1232 *
1233 * Get all features supported by this OSD
1234 *
1235 * Return value: CompatSet of all supported features
1236 */
1237 static CompatSet get_osd_compat_set();
f67539c2 1238
7c673cae
FG
1239
1240private:
1241 class C_Tick;
1242 class C_Tick_WithoutOSDLock;
1243
11fdf7f2
TL
1244 // -- config settings --
1245 float m_osd_pg_epoch_max_lag_factor;
1246
7c673cae
FG
1247 // -- superblock --
1248 OSDSuperblock superblock;
1249
1250 void write_superblock();
1251 void write_superblock(ObjectStore::Transaction& t);
1252 int read_superblock();
1253
1254 void clear_temp_objects();
1255
1256 CompatSet osd_compat;
1257
1258 // -- state --
1259public:
1260 typedef enum {
1261 STATE_INITIALIZING = 1,
1262 STATE_PREBOOT,
1263 STATE_BOOTING,
1264 STATE_ACTIVE,
1265 STATE_STOPPING,
1266 STATE_WAITING_FOR_HEALTHY
1267 } osd_state_t;
1268
1269 static const char *get_state_name(int s) {
1270 switch (s) {
1271 case STATE_INITIALIZING: return "initializing";
1272 case STATE_PREBOOT: return "preboot";
1273 case STATE_BOOTING: return "booting";
1274 case STATE_ACTIVE: return "active";
1275 case STATE_STOPPING: return "stopping";
1276 case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy";
1277 default: return "???";
1278 }
1279 }
1280
1281private:
11fdf7f2 1282 std::atomic<int> state{STATE_INITIALIZING};
7c673cae
FG
1283
1284public:
1285 int get_state() const {
1286 return state;
1287 }
1288 void set_state(int s) {
1289 state = s;
1290 }
1291 bool is_initializing() const {
1292 return state == STATE_INITIALIZING;
1293 }
1294 bool is_preboot() const {
1295 return state == STATE_PREBOOT;
1296 }
1297 bool is_booting() const {
1298 return state == STATE_BOOTING;
1299 }
1300 bool is_active() const {
1301 return state == STATE_ACTIVE;
1302 }
1303 bool is_stopping() const {
1304 return state == STATE_STOPPING;
1305 }
1306 bool is_waiting_for_healthy() const {
1307 return state == STATE_WAITING_FOR_HEALTHY;
1308 }
1309
1310private:
1311
7c673cae 1312 ShardedThreadPool osd_op_tp;
7c673cae 1313
7c673cae
FG
1314 void get_latest_osdmap();
1315
1316 // -- sessions --
1317private:
9f95a23c 1318 void dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap);
7c673cae 1319
9f95a23c 1320 ceph::mutex session_waiting_lock = ceph::make_mutex("OSD::session_waiting_lock");
f67539c2 1321 std::set<ceph::ref_t<Session>> session_waiting_for_map;
7c673cae
FG
1322
1323 /// Caller assumes refs for included Sessions
f67539c2 1324 void get_sessions_waiting_for_map(std::set<ceph::ref_t<Session>> *out) {
11fdf7f2 1325 std::lock_guard l(session_waiting_lock);
7c673cae
FG
1326 out->swap(session_waiting_for_map);
1327 }
9f95a23c 1328 void register_session_waiting_on_map(const ceph::ref_t<Session>& session) {
11fdf7f2
TL
1329 std::lock_guard l(session_waiting_lock);
1330 session_waiting_for_map.insert(session);
7c673cae 1331 }
9f95a23c 1332 void clear_session_waiting_on_map(const ceph::ref_t<Session>& session) {
11fdf7f2
TL
1333 std::lock_guard l(session_waiting_lock);
1334 session_waiting_for_map.erase(session);
7c673cae
FG
1335 }
1336 void dispatch_sessions_waiting_on_map() {
f67539c2 1337 std::set<ceph::ref_t<Session>> sessions_to_check;
7c673cae 1338 get_sessions_waiting_for_map(&sessions_to_check);
11fdf7f2 1339 for (auto i = sessions_to_check.begin();
7c673cae
FG
1340 i != sessions_to_check.end();
1341 sessions_to_check.erase(i++)) {
11fdf7f2 1342 std::lock_guard l{(*i)->session_dispatch_lock};
9f95a23c 1343 dispatch_session_waiting(*i, get_osdmap());
7c673cae
FG
1344 }
1345 }
9f95a23c 1346 void session_handle_reset(const ceph::ref_t<Session>& session) {
11fdf7f2 1347 std::lock_guard l(session->session_dispatch_lock);
7c673cae
FG
1348 clear_session_waiting_on_map(session);
1349
1350 session->clear_backoffs();
1351
1352 /* Messages have connection refs, we need to clear the
1353 * connection->session->message->connection
1354 * cycles which result.
1355 * Bug #12338
1356 */
1357 session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
1358 }
1359
1360private:
1361 /**
1362 * @defgroup monc helpers
1363 * @{
1364 * Right now we only have the one
1365 */
1366
1367 /**
1368 * Ask the Monitors for a sequence of OSDMaps.
1369 *
1370 * @param epoch The epoch to start with when replying
1371 * @param force_request True if this request forces a new subscription to
1372 * the monitors; false if an outstanding request that encompasses it is
1373 * sufficient.
1374 */
1375 void osdmap_subscribe(version_t epoch, bool force_request);
1376 /** @} monc helpers */
1377
9f95a23c 1378 ceph::mutex osdmap_subscribe_lock = ceph::make_mutex("OSD::osdmap_subscribe_lock");
181888fb
FG
1379 epoch_t latest_subscribed_epoch{0};
1380
7c673cae
FG
1381 // -- heartbeat --
1382 /// information about a heartbeat peer
1383 struct HeartbeatInfo {
1384 int peer; ///< peer
1385 ConnectionRef con_front; ///< peer connection (front)
1386 ConnectionRef con_back; ///< peer connection (back)
1387 utime_t first_tx; ///< time we sent our first ping request
1388 utime_t last_tx; ///< last time we sent a ping request
1389 utime_t last_rx_front; ///< last time we got a ping reply on the front side
1390 utime_t last_rx_back; ///< last time we got a ping reply on the back side
1391 epoch_t epoch; ///< most recent epoch we wanted this peer
11fdf7f2
TL
1392 /// number of connections we send and receive heartbeat pings/replies
1393 static constexpr int HEARTBEAT_MAX_CONN = 2;
1394 /// history of inflight pings, arranging by timestamp we sent
1395 /// send time -> deadline -> remaining replies
f67539c2 1396 std::map<utime_t, std::pair<utime_t, int>> ping_history;
11fdf7f2 1397
eafe8130
TL
1398 utime_t hb_interval_start;
1399 uint32_t hb_average_count = 0;
1400 uint32_t hb_index = 0;
1401
1402 uint32_t hb_total_back = 0;
1403 uint32_t hb_min_back = UINT_MAX;
1404 uint32_t hb_max_back = 0;
f67539c2
TL
1405 std::vector<uint32_t> hb_back_pingtime;
1406 std::vector<uint32_t> hb_back_min;
1407 std::vector<uint32_t> hb_back_max;
eafe8130
TL
1408
1409 uint32_t hb_total_front = 0;
1410 uint32_t hb_min_front = UINT_MAX;
1411 uint32_t hb_max_front = 0;
f67539c2
TL
1412 std::vector<uint32_t> hb_front_pingtime;
1413 std::vector<uint32_t> hb_front_min;
1414 std::vector<uint32_t> hb_front_max;
eafe8130 1415
b3b6e05e 1416 bool is_stale(utime_t stale) const {
494da23a
TL
1417 if (ping_history.empty()) {
1418 return false;
1419 }
1420 utime_t oldest_deadline = ping_history.begin()->second.first;
1421 return oldest_deadline <= stale;
1422 }
1423
b3b6e05e 1424 bool is_unhealthy(utime_t now) const {
11fdf7f2
TL
1425 if (ping_history.empty()) {
1426 /// we haven't sent a ping yet or we have got all replies,
1427 /// in either way we are safe and healthy for now
1428 return false;
1429 }
7c673cae 1430
11fdf7f2
TL
1431 utime_t oldest_deadline = ping_history.begin()->second.first;
1432 return now > oldest_deadline;
7c673cae
FG
1433 }
1434
b3b6e05e 1435 bool is_healthy(utime_t now) const {
11fdf7f2
TL
1436 if (last_rx_front == utime_t() || last_rx_back == utime_t()) {
1437 // only declare to be healthy until we have received the first
1438 // replies from both front/back connections
1439 return false;
1440 }
1441 return !is_unhealthy(now);
1442 }
9f95a23c
TL
1443
1444 void clear_mark_down(Connection *except = nullptr) {
1445 if (con_back && con_back != except) {
1446 con_back->mark_down();
1447 con_back->clear_priv();
1448 con_back.reset(nullptr);
1449 }
1450 if (con_front && con_front != except) {
1451 con_front->mark_down();
1452 con_front->clear_priv();
1453 con_front.reset(nullptr);
1454 }
1455 }
7c673cae 1456 };
9f95a23c
TL
1457
1458 ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock");
f67539c2 1459 std::map<int, int> debug_heartbeat_drops_remaining;
9f95a23c 1460 ceph::condition_variable heartbeat_cond;
7c673cae 1461 bool heartbeat_stop;
f67539c2
TL
1462 std::atomic<bool> heartbeat_need_update;
1463 std::map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
7c673cae
FG
1464 utime_t last_mon_heartbeat;
1465 Messenger *hb_front_client_messenger;
1466 Messenger *hb_back_client_messenger;
1467 Messenger *hb_front_server_messenger;
1468 Messenger *hb_back_server_messenger;
1469 utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
1470 double daily_loadavg;
9f95a23c 1471 ceph::mono_time startup_time;
eafe8130
TL
1472
1473 // Track ping repsonse times using vector as a circular buffer
1474 // MUST BE A POWER OF 2
1475 const uint32_t hb_vector_size = 16;
1476
7c673cae
FG
1477 void _add_heartbeat_peer(int p);
1478 void _remove_heartbeat_peer(int p);
1479 bool heartbeat_reset(Connection *con);
1480 void maybe_update_heartbeat_peers();
494da23a 1481 void reset_heartbeat_peers(bool all);
7c673cae
FG
1482 bool heartbeat_peers_need_update() {
1483 return heartbeat_need_update.load();
1484 }
1485 void heartbeat_set_peers_need_update() {
1486 heartbeat_need_update.store(true);
1487 }
1488 void heartbeat_clear_peers_need_update() {
1489 heartbeat_need_update.store(false);
1490 }
1491 void heartbeat();
1492 void heartbeat_check();
1493 void heartbeat_entry();
1494 void need_heartbeat_peer_update();
1495
1496 void heartbeat_kick() {
11fdf7f2 1497 std::lock_guard l(heartbeat_lock);
9f95a23c 1498 heartbeat_cond.notify_all();
7c673cae
FG
1499 }
1500
1501 struct T_Heartbeat : public Thread {
1502 OSD *osd;
1503 explicit T_Heartbeat(OSD *o) : osd(o) {}
1504 void *entry() override {
1505 osd->heartbeat_entry();
1506 return 0;
1507 }
1508 } heartbeat_thread;
1509
1510public:
1511 bool heartbeat_dispatch(Message *m);
1512
1513 struct HeartbeatDispatcher : public Dispatcher {
1514 OSD *osd;
1515 explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
1516
1517 bool ms_can_fast_dispatch_any() const override { return true; }
1518 bool ms_can_fast_dispatch(const Message *m) const override {
1519 switch (m->get_type()) {
11fdf7f2
TL
1520 case CEPH_MSG_PING:
1521 case MSG_OSD_PING:
1522 return true;
1523 default:
1524 return false;
1525 }
7c673cae
FG
1526 }
1527 void ms_fast_dispatch(Message *m) override {
1528 osd->heartbeat_dispatch(m);
1529 }
1530 bool ms_dispatch(Message *m) override {
1531 return osd->heartbeat_dispatch(m);
1532 }
1533 bool ms_handle_reset(Connection *con) override {
1534 return osd->heartbeat_reset(con);
1535 }
1536 void ms_handle_remote_reset(Connection *con) override {}
1537 bool ms_handle_refused(Connection *con) override {
1538 return osd->ms_handle_refused(con);
1539 }
11fdf7f2 1540 int ms_handle_authentication(Connection *con) override {
7c673cae
FG
1541 return true;
1542 }
1543 } heartbeat_dispatcher;
1544
1545private:
1546 // -- waiters --
f67539c2
TL
1547 std::list<OpRequestRef> finished;
1548
1549 void take_waiters(std::list<OpRequestRef>& ls) {
9f95a23c 1550 ceph_assert(ceph_mutex_is_locked(osd_lock));
7c673cae
FG
1551 finished.splice(finished.end(), ls);
1552 }
1553 void do_waiters();
f67539c2 1554
7c673cae
FG
1555 // -- op tracking --
1556 OpTracker op_tracker;
f67539c2 1557 void test_ops(std::string command, std::string args, std::ostream& ss);
7c673cae
FG
1558 friend class TestOpsSocketHook;
1559 TestOpsSocketHook *test_ops_hook;
11fdf7f2 1560 friend struct C_FinishSplits;
7c673cae
FG
1561 friend struct C_OpenPGs;
1562
11fdf7f2 1563protected:
7c673cae
FG
1564
1565 /*
1566 * The ordered op delivery chain is:
1567 *
9f95a23c
TL
1568 * fast dispatch -> scheduler back
1569 * scheduler front <-> to_process back
7c673cae
FG
1570 * to_process front -> RunVis(item)
1571 * <- queue_front()
1572 *
9f95a23c
TL
1573 * The scheduler is per-shard, and to_process is per pg_slot. Items can be
1574 * pushed back up into to_process and/or scheduler while order is preserved.
7c673cae
FG
1575 *
1576 * Multiple worker threads can operate on each shard.
1577 *
11fdf7f2 1578 * Under normal circumstances, num_running == to_process.size(). There are
7c673cae
FG
1579 * two times when that is not true: (1) when waiting_for_pg == true and
1580 * to_process is accumulating requests that are waiting for the pg to be
1581 * instantiated; in that case they will all get requeued together by
1582 * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
1583 * and already requeued the items.
1584 */
9f95a23c
TL
1585 friend class ceph::osd::scheduler::PGOpItem;
1586 friend class ceph::osd::scheduler::PGPeeringItem;
1587 friend class ceph::osd::scheduler::PGRecovery;
f67539c2 1588 friend class ceph::osd::scheduler::PGRecoveryMsg;
9f95a23c 1589 friend class ceph::osd::scheduler::PGDelete;
224ce89b 1590
7c673cae 1591 class ShardedOpWQ
9f95a23c 1592 : public ShardedThreadPool::ShardedWQ<OpSchedulerItem>
7c673cae 1593 {
7c673cae 1594 OSD *osd;
1d09f67e 1595 bool m_fast_shutdown = false;
7c673cae 1596 public:
11fdf7f2 1597 ShardedOpWQ(OSD *o,
f67539c2
TL
1598 ceph::timespan ti,
1599 ceph::timespan si,
7c673cae 1600 ShardedThreadPool* tp)
9f95a23c 1601 : ShardedThreadPool::ShardedWQ<OpSchedulerItem>(ti, si, tp),
11fdf7f2 1602 osd(o) {
7c673cae
FG
1603 }
1604
11fdf7f2
TL
1605 void _add_slot_waiter(
1606 spg_t token,
1607 OSDShardPGSlot *slot,
9f95a23c 1608 OpSchedulerItem&& qi);
7c673cae
FG
1609
1610 /// try to do some work
f67539c2 1611 void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb) override;
7c673cae 1612
1d09f67e
TL
1613 void stop_for_fast_shutdown();
1614
7c673cae 1615 /// enqueue a new item
9f95a23c 1616 void _enqueue(OpSchedulerItem&& item) override;
7c673cae
FG
1617
1618 /// requeue an old item (at the front of the line)
9f95a23c 1619 void _enqueue_front(OpSchedulerItem&& item) override;
f67539c2 1620
7c673cae 1621 void return_waiting_threads() override {
11fdf7f2
TL
1622 for(uint32_t i = 0; i < osd->num_shards; i++) {
1623 OSDShard* sdata = osd->shards[i];
1624 assert (NULL != sdata);
1625 std::scoped_lock l{sdata->sdata_wait_lock};
1626 sdata->stop_waiting = true;
1627 sdata->sdata_cond.notify_all();
7c673cae
FG
1628 }
1629 }
1630
11fdf7f2
TL
1631 void stop_return_waiting_threads() override {
1632 for(uint32_t i = 0; i < osd->num_shards; i++) {
1633 OSDShard* sdata = osd->shards[i];
7c673cae 1634 assert (NULL != sdata);
11fdf7f2
TL
1635 std::scoped_lock l{sdata->sdata_wait_lock};
1636 sdata->stop_waiting = false;
1637 }
1638 }
1639
f67539c2 1640 void dump(ceph::Formatter *f) {
11fdf7f2
TL
1641 for(uint32_t i = 0; i < osd->num_shards; i++) {
1642 auto &&sdata = osd->shards[i];
1643
1644 char queue_name[32] = {0};
1645 snprintf(queue_name, sizeof(queue_name), "%s%" PRIu32, "OSD:ShardedOpWQ:", i);
1646 ceph_assert(NULL != sdata);
1647
1648 std::scoped_lock l{sdata->shard_lock};
1649 f->open_object_section(queue_name);
9f95a23c 1650 sdata->scheduler->dump(*f);
7c673cae 1651 f->close_section();
7c673cae
FG
1652 }
1653 }
1654
11fdf7f2
TL
1655 bool is_shard_empty(uint32_t thread_index) override {
1656 uint32_t shard_index = thread_index % osd->num_shards;
1657 auto &&sdata = osd->shards[shard_index];
1658 ceph_assert(sdata);
1659 std::lock_guard l(sdata->shard_lock);
1660 if (thread_index < osd->num_shards) {
9f95a23c 1661 return sdata->scheduler->empty() && sdata->context_queue.empty();
11fdf7f2 1662 } else {
9f95a23c 1663 return sdata->scheduler->empty();
7c673cae 1664 }
11fdf7f2 1665 }
7c673cae 1666
f67539c2 1667 void handle_oncommits(std::list<Context*>& oncommits) {
11fdf7f2
TL
1668 for (auto p : oncommits) {
1669 p->complete(0);
1670 }
7c673cae
FG
1671 }
1672 } op_shardedwq;
1673
1674
11fdf7f2 1675 void enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch);
7c673cae
FG
1676 void dequeue_op(
1677 PGRef pg, OpRequestRef op,
1678 ThreadPool::TPHandle &handle);
1679
11fdf7f2
TL
1680 void enqueue_peering_evt(
1681 spg_t pgid,
1682 PGPeeringEventRef ref);
11fdf7f2
TL
1683 void dequeue_peering_evt(
1684 OSDShard *sdata,
1685 PG *pg,
1686 PGPeeringEventRef ref,
1687 ThreadPool::TPHandle& handle);
1688
1689 void dequeue_delete(
1690 OSDShard *sdata,
1691 PG *pg,
1692 epoch_t epoch,
1693 ThreadPool::TPHandle& handle);
7c673cae
FG
1694
1695 friend class PG;
f67539c2 1696 friend struct OSDShard;
7c673cae 1697 friend class PrimaryLogPG;
f67539c2 1698 friend class PgScrubber;
7c673cae
FG
1699
1700
1701 protected:
1702
1703 // -- osd map --
9f95a23c
TL
1704 // TODO: switch to std::atomic<OSDMapRef> when C++20 will be available.
1705 OSDMapRef _osdmap;
1706 void set_osdmap(OSDMapRef osdmap) {
1707 std::atomic_store(&_osdmap, osdmap);
1708 }
1709 OSDMapRef get_osdmap() const {
1710 return std::atomic_load(&_osdmap);
7c673cae 1711 }
224ce89b 1712 epoch_t get_osdmap_epoch() const {
9f95a23c
TL
1713 // XXX: performance?
1714 auto osdmap = get_osdmap();
7c673cae
FG
1715 return osdmap ? osdmap->get_epoch() : 0;
1716 }
1717
11fdf7f2
TL
1718 pool_pg_num_history_t pg_num_history;
1719
9f95a23c 1720 ceph::shared_mutex map_lock = ceph::make_shared_mutex("OSD::map_lock");
f67539c2
TL
1721 std::list<OpRequestRef> waiting_for_osdmap;
1722 std::deque<utime_t> osd_markdown_log;
7c673cae
FG
1723
1724 friend struct send_map_on_destruct;
1725
1726 void wait_for_new_map(OpRequestRef op);
1727 void handle_osd_map(class MOSDMap *m);
1728 void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
1729 void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
1730 void note_down_osd(int osd);
1731 void note_up_osd(int osd);
f67539c2 1732 friend struct C_OnMapCommit;
7c673cae
FG
1733
1734 bool advance_pg(
11fdf7f2
TL
1735 epoch_t advance_to,
1736 PG *pg,
7c673cae 1737 ThreadPool::TPHandle &handle,
9f95a23c 1738 PeeringCtx &rctx);
7c673cae
FG
1739 void consume_map();
1740 void activate_map();
1741
1742 // osd map cache (past osd maps)
1743 OSDMapRef get_map(epoch_t e) {
1744 return service.get_map(e);
1745 }
1746 OSDMapRef add_map(OSDMap *o) {
1747 return service.add_map(o);
1748 }
f67539c2 1749 bool get_map_bl(epoch_t e, ceph::buffer::list& bl) {
7c673cae
FG
1750 return service.get_map_bl(e, bl);
1751 }
11fdf7f2
TL
1752
1753public:
1754 // -- shards --
f67539c2 1755 std::vector<OSDShard*> shards;
11fdf7f2
TL
1756 uint32_t num_shards = 0;
1757
1758 void inc_num_pgs() {
1759 ++num_pgs;
1760 }
1761 void dec_num_pgs() {
1762 --num_pgs;
1763 }
1764 int get_num_pgs() const {
1765 return num_pgs;
7c673cae
FG
1766 }
1767
1768protected:
9f95a23c 1769 ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
11fdf7f2 1770 /// merge epoch -> target pgid -> source pgid -> pg
f67539c2 1771 std::map<epoch_t,std::map<spg_t,std::map<spg_t,PGRef>>> merge_waiters;
11fdf7f2
TL
1772
1773 bool add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef source,
1774 unsigned need);
1775
7c673cae 1776 // -- placement groups --
11fdf7f2 1777 std::atomic<size_t> num_pgs = {0};
7c673cae 1778
3efd9988 1779 std::mutex pending_creates_lock;
9f95a23c 1780 using create_from_osd_t = std::pair<spg_t, bool /* is primary*/>;
b32b8144 1781 std::set<create_from_osd_t> pending_creates_from_osd;
3efd9988
FG
1782 unsigned pending_creates_from_mon = 0;
1783
7c673cae
FG
1784 PGRecoveryStats pg_recovery_stats;
1785
11fdf7f2
TL
1786 PGRef _lookup_pg(spg_t pgid);
1787 PGRef _lookup_lock_pg(spg_t pgid);
1788 void register_pg(PGRef pg);
1789 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
7c673cae 1790
f67539c2
TL
1791 void _get_pgs(std::vector<PGRef> *v, bool clear_too=false);
1792 void _get_pgids(std::vector<spg_t> *v);
31f18b77
FG
1793
1794public:
11fdf7f2 1795 PGRef lookup_lock_pg(spg_t pgid);
31f18b77 1796
11fdf7f2 1797 std::set<int64_t> get_mapped_pools();
35e4c445 1798
31f18b77 1799protected:
7c673cae 1800 PG* _make_pg(OSDMapRef createmap, spg_t pgid);
7c673cae 1801
11fdf7f2
TL
1802 bool maybe_wait_for_max_pg(const OSDMapRef& osdmap,
1803 spg_t pgid, bool is_mon_create);
3efd9988
FG
1804 void resume_creating_pg();
1805
7c673cae 1806 void load_pgs();
7c673cae
FG
1807
1808 /// build initial pg history and intervals on create
1809 void build_initial_pg_history(
1810 spg_t pgid,
1811 epoch_t created,
1812 utime_t created_stamp,
1813 pg_history_t *h,
1814 PastIntervals *pi);
1815
7c673cae
FG
1816 epoch_t last_pg_create_epoch;
1817
1818 void handle_pg_create(OpRequestRef op);
1819
1820 void split_pgs(
1821 PG *parent,
f67539c2 1822 const std::set<spg_t> &childpgids, std::set<PGRef> *out_pgs,
7c673cae
FG
1823 OSDMapRef curmap,
1824 OSDMapRef nextmap,
9f95a23c 1825 PeeringCtx &rctx);
f67539c2 1826 void _finish_splits(std::set<PGRef>& pgs);
7c673cae
FG
1827
1828 // == monitor interaction ==
9f95a23c 1829 ceph::mutex mon_report_lock = ceph::make_mutex("OSD::mon_report_lock");
7c673cae 1830 utime_t last_mon_report;
11fdf7f2 1831 Finisher boot_finisher;
7c673cae
FG
1832
1833 // -- boot --
1834 void start_boot();
1835 void _got_mon_epochs(epoch_t oldest, epoch_t newest);
1836 void _preboot(epoch_t oldest, epoch_t newest);
1837 void _send_boot();
f67539c2 1838 void _collect_metadata(std::map<std::string,std::string> *pmeta);
9f95a23c
TL
1839 void _get_purged_snaps();
1840 void handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *r);
7c673cae
FG
1841
1842 void start_waiting_for_healthy();
1843 bool _is_healthy();
1844
1845 void send_full_update();
f67539c2
TL
1846
1847 friend struct CB_OSD_GetVersion;
7c673cae
FG
1848
1849 // -- alive --
1850 epoch_t up_thru_wanted;
1851
1852 void queue_want_up_thru(epoch_t want);
1853 void send_alive();
1854
1855 // -- full map requests --
1856 epoch_t requested_full_first, requested_full_last;
1857
1858 void request_full_map(epoch_t first, epoch_t last);
1859 void rerequest_full_maps() {
1860 epoch_t first = requested_full_first;
1861 epoch_t last = requested_full_last;
1862 requested_full_first = 0;
1863 requested_full_last = 0;
1864 request_full_map(first, last);
1865 }
1866 void got_full_map(epoch_t e);
1867
1868 // -- failures --
f67539c2
TL
1869 std::map<int,utime_t> failure_queue;
1870 std::map<int,std::pair<utime_t,entity_addrvec_t> > failure_pending;
7c673cae
FG
1871
1872 void requeue_failures();
1873 void send_failures();
11fdf7f2
TL
1874 void send_still_alive(epoch_t epoch, int osd, const entity_addrvec_t &addrs);
1875 void cancel_pending_failures();
7c673cae
FG
1876
1877 ceph::coarse_mono_clock::time_point last_sent_beacon;
9f95a23c 1878 ceph::mutex min_last_epoch_clean_lock = ceph::make_mutex("OSD::min_last_epoch_clean_lock");
7c673cae
FG
1879 epoch_t min_last_epoch_clean = 0;
1880 // which pgs were scanned for min_lec
1881 std::vector<pg_t> min_last_epoch_clean_pgs;
1882 void send_beacon(const ceph::coarse_mono_clock::time_point& now);
1883
7c673cae
FG
1884 ceph_tid_t get_tid() {
1885 return service.get_tid();
1886 }
1887
1888 // -- generic pg peering --
9f95a23c 1889 void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
7c673cae 1890 ThreadPool::TPHandle *handle = NULL);
7c673cae
FG
1891
1892 bool require_mon_peer(const Message *m);
1893 bool require_mon_or_mgr_peer(const Message *m);
1894 bool require_osd_peer(const Message *m);
1895 /***
1896 * Verifies that we were alive in the given epoch, and that
1897 * still are.
1898 */
1899 bool require_self_aliveness(const Message *m, epoch_t alive_since);
1900 /**
1901 * Verifies that the OSD who sent the given op has the same
f67539c2 1902 * address as in the given std::map.
7c673cae
FG
1903 * @pre op was sent by an OSD using the cluster messenger
1904 */
9f95a23c 1905 bool require_same_peer_instance(const Message *m, const OSDMapRef& map,
7c673cae
FG
1906 bool is_fast_dispatch);
1907
1908 bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
1909 bool is_fast_dispatch);
1910
11fdf7f2 1911 void handle_fast_pg_create(MOSDPGCreate2 *m);
11fdf7f2
TL
1912 void handle_pg_query_nopg(const MQuery& q);
1913 void handle_fast_pg_notify(MOSDPGNotify *m);
1914 void handle_pg_notify_nopg(const MNotifyRec& q);
1915 void handle_fast_pg_info(MOSDPGInfo *m);
1916 void handle_fast_pg_remove(MOSDPGRemove *m);
7c673cae 1917
11fdf7f2
TL
1918public:
1919 // used by OSDShard
1920 PGRef handle_pg_create_info(const OSDMapRef& osdmap, const PGCreateInfo *info);
1921protected:
c07f9fc5 1922
11fdf7f2 1923 void handle_fast_force_recovery(MOSDForceRecovery *m);
7c673cae
FG
1924
1925 // -- commands --
7c673cae 1926 void handle_command(class MCommand *m);
11fdf7f2 1927
7c673cae
FG
1928
1929 // -- pg recovery --
1930 void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
1931 ThreadPool::TPHandle &handle);
1932
1933
1934 // -- scrubbing --
1935 void sched_scrub();
494da23a 1936 void resched_all_scrubs();
7c673cae 1937 bool scrub_random_backoff();
7c673cae 1938
b32b8144
FG
1939 // -- status reporting --
1940 MPGStats *collect_pg_stats();
11fdf7f2
TL
1941 std::vector<DaemonHealthMetric> get_health_metrics();
1942
b32b8144 1943
224ce89b 1944private:
7c673cae
FG
1945 bool ms_can_fast_dispatch_any() const override { return true; }
1946 bool ms_can_fast_dispatch(const Message *m) const override {
1947 switch (m->get_type()) {
11fdf7f2 1948 case CEPH_MSG_PING:
7c673cae
FG
1949 case CEPH_MSG_OSD_OP:
1950 case CEPH_MSG_OSD_BACKOFF:
11fdf7f2
TL
1951 case MSG_OSD_SCRUB2:
1952 case MSG_OSD_FORCE_RECOVERY:
1953 case MSG_MON_COMMAND:
1954 case MSG_OSD_PG_CREATE2:
1955 case MSG_OSD_PG_QUERY:
9f95a23c 1956 case MSG_OSD_PG_QUERY2:
11fdf7f2 1957 case MSG_OSD_PG_INFO:
9f95a23c 1958 case MSG_OSD_PG_INFO2:
11fdf7f2 1959 case MSG_OSD_PG_NOTIFY:
9f95a23c 1960 case MSG_OSD_PG_NOTIFY2:
11fdf7f2
TL
1961 case MSG_OSD_PG_LOG:
1962 case MSG_OSD_PG_TRIM:
1963 case MSG_OSD_PG_REMOVE:
1964 case MSG_OSD_BACKFILL_RESERVE:
1965 case MSG_OSD_RECOVERY_RESERVE:
7c673cae 1966 case MSG_OSD_REPOP:
7c673cae
FG
1967 case MSG_OSD_REPOPREPLY:
1968 case MSG_OSD_PG_PUSH:
1969 case MSG_OSD_PG_PULL:
1970 case MSG_OSD_PG_PUSH_REPLY:
1971 case MSG_OSD_PG_SCAN:
1972 case MSG_OSD_PG_BACKFILL:
1973 case MSG_OSD_PG_BACKFILL_REMOVE:
1974 case MSG_OSD_EC_WRITE:
1975 case MSG_OSD_EC_WRITE_REPLY:
1976 case MSG_OSD_EC_READ:
1977 case MSG_OSD_EC_READ_REPLY:
1978 case MSG_OSD_SCRUB_RESERVE:
1979 case MSG_OSD_REP_SCRUB:
1980 case MSG_OSD_REP_SCRUBMAP:
1981 case MSG_OSD_PG_UPDATE_LOG_MISSING:
1982 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
c07f9fc5
FG
1983 case MSG_OSD_PG_RECOVERY_DELETE:
1984 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
9f95a23c
TL
1985 case MSG_OSD_PG_LEASE:
1986 case MSG_OSD_PG_LEASE_ACK:
7c673cae
FG
1987 return true;
1988 default:
1989 return false;
1990 }
1991 }
1992 void ms_fast_dispatch(Message *m) override;
7c673cae 1993 bool ms_dispatch(Message *m) override;
7c673cae
FG
1994 void ms_handle_connect(Connection *con) override;
1995 void ms_handle_fast_connect(Connection *con) override;
1996 void ms_handle_fast_accept(Connection *con) override;
11fdf7f2 1997 int ms_handle_authentication(Connection *con) override;
7c673cae
FG
1998 bool ms_handle_reset(Connection *con) override;
1999 void ms_handle_remote_reset(Connection *con) override {}
2000 bool ms_handle_refused(Connection *con) override;
2001
7c673cae
FG
2002 public:
2003 /* internal and external can point to the same messenger, they will still
2004 * be cleaned up properly*/
2005 OSD(CephContext *cct_,
20effc67 2006 std::unique_ptr<ObjectStore> store_,
7c673cae
FG
2007 int id,
2008 Messenger *internal,
2009 Messenger *external,
2010 Messenger *hb_front_client,
2011 Messenger *hb_back_client,
2012 Messenger *hb_front_server,
2013 Messenger *hb_back_server,
2014 Messenger *osdc_messenger,
f67539c2
TL
2015 MonClient *mc, const std::string &dev, const std::string &jdev,
2016 ceph::async::io_context_pool& poolctx);
7c673cae
FG
2017 ~OSD() override;
2018
2019 // static bits
20effc67
TL
2020 static int mkfs(CephContext *cct,
2021 std::unique_ptr<ObjectStore> store,
2022 uuid_d fsid,
2023 int whoami,
2024 std::string osdspec_affinity);
11fdf7f2 2025
f67539c2
TL
2026 /* remove any non-user xattrs from a std::map of them */
2027 void filter_xattrs(std::map<std::string, ceph::buffer::ptr>& attrs) {
2028 for (std::map<std::string, ceph::buffer::ptr>::iterator iter = attrs.begin();
7c673cae
FG
2029 iter != attrs.end();
2030 ) {
2031 if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
2032 attrs.erase(iter++);
2033 else ++iter;
2034 }
2035 }
2036
2037private:
f67539c2 2038 int mon_cmd_maybe_osd_create(std::string &cmd);
7c673cae
FG
2039 int update_crush_device_class();
2040 int update_crush_location();
2041
3efd9988
FG
2042 static int write_meta(CephContext *cct,
2043 ObjectStore *store,
e306af50 2044 uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami, std::string& osdspec_affinity);
7c673cae 2045
f67539c2
TL
2046 void handle_scrub(class MOSDScrub *m);
2047 void handle_fast_scrub(class MOSDScrub2 *m);
7c673cae
FG
2048 void handle_osd_ping(class MOSDPing *m);
2049
9f95a23c 2050 size_t get_num_cache_shards();
31f18b77
FG
2051 int get_num_op_shards();
2052 int get_num_op_threads();
2053
c07f9fc5 2054 float get_osd_recovery_sleep();
11fdf7f2 2055 float get_osd_delete_sleep();
494da23a 2056 float get_osd_snap_trim_sleep();
11fdf7f2 2057
9f95a23c 2058 int get_recovery_max_active();
a4b75251 2059 void maybe_override_max_osd_capacity_for_qos();
b3b6e05e 2060 bool maybe_override_options_for_qos();
a4b75251
TL
2061 int run_osd_bench_test(int64_t count,
2062 int64_t bsize,
2063 int64_t osize,
2064 int64_t onum,
2065 double *elapsed,
2066 std::ostream& ss);
2067 int mon_cmd_set_config(const std::string &key, const std::string &val);
20effc67 2068 bool unsupported_objstore_for_qos();
9f95a23c
TL
2069
2070 void scrub_purged_snaps();
f67539c2 2071 void probe_smart(const std::string& devid, std::ostream& ss);
c07f9fc5 2072
7c673cae 2073public:
11fdf7f2 2074 static int peek_meta(ObjectStore *store,
f67539c2 2075 std::string *magic,
11fdf7f2
TL
2076 uuid_d *cluster_fsid,
2077 uuid_d *osd_fsid,
2078 int *whoami,
9f95a23c 2079 ceph_release_t *min_osd_release);
f67539c2 2080
7c673cae
FG
2081
2082 // startup/shutdown
2083 int pre_init();
2084 int init();
2085 void final_init();
2086
2087 int enable_disable_fuse(bool stop);
11fdf7f2 2088 int set_numa_affinity();
7c673cae
FG
2089
2090 void suicide(int exitcode);
2091 int shutdown();
2092
2093 void handle_signal(int signum);
2094
2095 /// check if we can throw out op from a disconnected client
2096 static bool op_is_discardable(const MOSDOp *m);
2097
2098public:
2099 OSDService service;
2100 friend class OSDService;
11fdf7f2
TL
2101
2102private:
9f95a23c
TL
2103 void set_perf_queries(const ConfigPayload &config_payload);
2104 MetricPayload get_perf_reports();
11fdf7f2 2105
9f95a23c 2106 ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock");
11fdf7f2
TL
2107 std::list<OSDPerfMetricQuery> m_perf_queries;
2108 std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> m_perf_limits;
7c673cae
FG
2109};
2110
224ce89b 2111
7c673cae
FG
2112//compatibility of the executable
2113extern const CompatSet::Feature ceph_osd_feature_compat[];
2114extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
2115extern const CompatSet::Feature ceph_osd_feature_incompat[];
2116
224ce89b 2117#endif // CEPH_OSD_H