]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/OSD.h
update dh_systemd restart patch for pacific
[ceph.git] / ceph / src / osd / OSD.h
CommitLineData
f67539c2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
7c673cae
FG
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
f67539c2 10 * License version 2.1, as published by the Free Software
7c673cae 11 * Foundation. See file COPYING.
f67539c2 12 *
7c673cae
FG
13 */
14
15#ifndef CEPH_OSD_H
16#define CEPH_OSD_H
17
18#include "PG.h"
19
20#include "msg/Dispatcher.h"
21
f67539c2 22#include "common/async/context_pool.h"
7c673cae
FG
23#include "common/Timer.h"
24#include "common/WorkQueue.h"
25#include "common/AsyncReserver.h"
26#include "common/ceph_context.h"
11fdf7f2 27#include "common/config_cacher.h"
7c673cae 28#include "common/zipkin_trace.h"
9f95a23c 29#include "common/ceph_timer.h"
7c673cae
FG
30
31#include "mgr/MgrClient.h"
32
33#include "os/ObjectStore.h"
7c673cae
FG
34
35#include "include/CompatSet.h"
9f95a23c 36#include "include/common_fwd.h"
7c673cae
FG
37
38#include "OpRequest.h"
39#include "Session.h"
40
9f95a23c 41#include "osd/scheduler/OpScheduler.h"
224ce89b 42
7c673cae
FG
43#include <atomic>
44#include <map>
45#include <memory>
11fdf7f2 46#include <string>
7c673cae
FG
47
48#include "include/unordered_map.h"
49
50#include "common/shared_cache.hpp"
51#include "common/simple_cache.hpp"
7c673cae 52#include "messages/MOSDOp.h"
7c673cae 53#include "common/EventTrace.h"
9f95a23c 54#include "osd/osd_perf_counters.h"
f67539c2 55#include "common/Finisher.h"
7c673cae
FG
56
57#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
58
11fdf7f2
TL
59/*
60
61 lock ordering for pg map
62
63 PG::lock
64 ShardData::lock
65 OSD::pg_map_lock
66
67 */
7c673cae 68
7c673cae
FG
69class Messenger;
70class Message;
71class MonClient;
7c673cae
FG
72class ObjectStore;
73class FuseStore;
74class OSDMap;
75class MLog;
76class Objecter;
11fdf7f2 77class KeyStore;
7c673cae
FG
78
79class Watch;
80class PrimaryLogPG;
81
7c673cae 82class TestOpsSocketHook;
11fdf7f2 83struct C_FinishSplits;
7c673cae
FG
84struct C_OpenPGs;
85class LogChannel;
7c673cae 86
11fdf7f2
TL
87class MOSDPGCreate2;
88class MOSDPGQuery;
89class MOSDPGNotify;
90class MOSDPGInfo;
91class MOSDPGRemove;
92class MOSDForceRecovery;
9f95a23c 93class MMonGetPurgedSnapsReply;
7c673cae
FG
94
95class OSD;
96
7c673cae 97class OSDService {
9f95a23c 98 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
7c673cae
FG
99public:
100 OSD *osd;
101 CephContext *cct;
11fdf7f2 102 ObjectStore::CollectionHandle meta_ch;
7c673cae
FG
103 const int whoami;
104 ObjectStore *&store;
105 LogClient &log_client;
106 LogChannelRef clog;
107 PGRecoveryStats &pg_recovery_stats;
108private:
109 Messenger *&cluster_messenger;
110 Messenger *&client_messenger;
111public:
112 PerfCounters *&logger;
113 PerfCounters *&recoverystate_perf;
114 MonClient *&monc;
7c673cae 115
11fdf7f2
TL
116 md_config_cacher_t<Option::size_t> osd_max_object_size;
117 md_config_cacher_t<bool> osd_skip_data_digest;
118
9f95a23c
TL
119 void enqueue_back(OpSchedulerItem&& qi);
120 void enqueue_front(OpSchedulerItem&& qi);
7c673cae
FG
121
122 void maybe_inject_dispatch_delay() {
11fdf7f2 123 if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) {
7c673cae 124 if (rand() % 10000 <
11fdf7f2 125 g_conf()->osd_debug_inject_dispatch_delay_probability * 10000) {
7c673cae 126 utime_t t;
11fdf7f2 127 t.set_from_double(g_conf()->osd_debug_inject_dispatch_delay_duration);
7c673cae
FG
128 t.sleep();
129 }
130 }
131 }
132
9f95a23c
TL
133 ceph::signedspan get_mnow();
134
7c673cae
FG
135private:
136 // -- superblock --
11fdf7f2 137 ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
7c673cae
FG
138 OSDSuperblock superblock;
139
140public:
141 OSDSuperblock get_superblock() {
11fdf7f2 142 std::lock_guard l(publish_lock);
7c673cae
FG
143 return superblock;
144 }
145 void publish_superblock(const OSDSuperblock &block) {
11fdf7f2 146 std::lock_guard l(publish_lock);
7c673cae
FG
147 superblock = block;
148 }
149
150 int get_nodeid() const { return whoami; }
151
152 std::atomic<epoch_t> max_oldest_map;
153private:
154 OSDMapRef osdmap;
155
156public:
157 OSDMapRef get_osdmap() {
11fdf7f2 158 std::lock_guard l(publish_lock);
7c673cae
FG
159 return osdmap;
160 }
161 epoch_t get_osdmap_epoch() {
11fdf7f2 162 std::lock_guard l(publish_lock);
7c673cae
FG
163 return osdmap ? osdmap->get_epoch() : 0;
164 }
165 void publish_map(OSDMapRef map) {
11fdf7f2 166 std::lock_guard l(publish_lock);
7c673cae
FG
167 osdmap = map;
168 }
169
170 /*
f67539c2
TL
171 * osdmap - current published std::map
172 * next_osdmap - pre_published std::map that is about to be published.
7c673cae
FG
173 *
174 * We use the next_osdmap to send messages and initiate connections,
f67539c2 175 * but only if the target is the same instance as the one in the std::map
7c673cae
FG
176 * epoch the current user is working from (i.e., the result is
177 * equivalent to what is in next_osdmap).
178 *
179 * This allows the helpers to start ignoring osds that are about to
180 * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
181 * down, without worrying about reopening connections from threads
182 * working from old maps.
183 */
184private:
185 OSDMapRef next_osdmap;
11fdf7f2 186 ceph::condition_variable pre_publish_cond;
9f95a23c 187 int pre_publish_waiter = 0;
7c673cae
FG
188
189public:
190 void pre_publish_map(OSDMapRef map) {
11fdf7f2 191 std::lock_guard l(pre_publish_lock);
7c673cae
FG
192 next_osdmap = std::move(map);
193 }
194
195 void activate_map();
196 /// map epochs reserved below
f67539c2 197 std::map<epoch_t, unsigned> map_reservations;
7c673cae
FG
198
199 /// gets ref to next_osdmap and registers the epoch as reserved
200 OSDMapRef get_nextmap_reserved() {
11fdf7f2 201 std::lock_guard l(pre_publish_lock);
7c673cae 202 epoch_t e = next_osdmap->get_epoch();
f67539c2
TL
203 std::map<epoch_t, unsigned>::iterator i =
204 map_reservations.insert(std::make_pair(e, 0)).first;
7c673cae
FG
205 i->second++;
206 return next_osdmap;
207 }
208 /// releases reservation on map
209 void release_map(OSDMapRef osdmap) {
11fdf7f2 210 std::lock_guard l(pre_publish_lock);
f67539c2 211 std::map<epoch_t, unsigned>::iterator i =
7c673cae 212 map_reservations.find(osdmap->get_epoch());
11fdf7f2
TL
213 ceph_assert(i != map_reservations.end());
214 ceph_assert(i->second > 0);
7c673cae
FG
215 if (--(i->second) == 0) {
216 map_reservations.erase(i);
217 }
9f95a23c
TL
218 if (pre_publish_waiter) {
219 pre_publish_cond.notify_all();
220 }
7c673cae
FG
221 }
222 /// blocks until there are no reserved maps prior to next_osdmap
223 void await_reserved_maps() {
11fdf7f2
TL
224 std::unique_lock l{pre_publish_lock};
225 ceph_assert(next_osdmap);
9f95a23c 226 pre_publish_waiter++;
11fdf7f2
TL
227 pre_publish_cond.wait(l, [this] {
228 auto i = map_reservations.cbegin();
229 return (i == map_reservations.cend() ||
230 i->first >= next_osdmap->get_epoch());
231 });
9f95a23c 232 pre_publish_waiter--;
11fdf7f2
TL
233 }
234 OSDMapRef get_next_osdmap() {
235 std::lock_guard l(pre_publish_lock);
11fdf7f2 236 return next_osdmap;
7c673cae
FG
237 }
238
9f95a23c
TL
239 void maybe_share_map(Connection *con,
240 const OSDMapRef& osdmap,
241 epoch_t peer_epoch_lb=0);
7c673cae
FG
242
243 void send_map(class MOSDMap *m, Connection *con);
9f95a23c
TL
244 void send_incremental_map(epoch_t since, Connection *con,
245 const OSDMapRef& osdmap);
7c673cae
FG
246 MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
247 OSDSuperblock& superblock);
7c673cae
FG
248
249 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
f67539c2 250 std::pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
7c673cae 251 void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
9f95a23c 252 void send_message_osd_cluster(std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch);
f67539c2
TL
253 void send_message_osd_cluster(MessageRef m, Connection *con) {
254 con->send_message2(std::move(m));
7c673cae
FG
255 }
256 void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
257 con->send_message(m);
258 }
7c673cae
FG
259 void send_message_osd_client(Message *m, const ConnectionRef& con) {
260 con->send_message(m);
261 }
11fdf7f2 262 entity_name_t get_cluster_msgr_name() const;
7c673cae
FG
263
264private:
265 // -- scrub scheduling --
9f95a23c 266 ceph::mutex sched_scrub_lock = ceph::make_mutex("OSDService::sched_scrub_lock");
eafe8130
TL
267 int scrubs_local;
268 int scrubs_remote;
7c673cae
FG
269
270public:
271 struct ScrubJob {
272 CephContext* cct;
273 /// pg to be scrubbed
274 spg_t pgid;
275 /// a time scheduled for scrub. but the scrub could be delayed if system
276 /// load is too high or it fails to fall in the scrub hours
277 utime_t sched_time;
278 /// the hard upper bound of scrub time
279 utime_t deadline;
280 ScrubJob() : cct(nullptr) {}
281 explicit ScrubJob(CephContext* cct, const spg_t& pg,
282 const utime_t& timestamp,
283 double pool_scrub_min_interval = 0,
284 double pool_scrub_max_interval = 0, bool must = true);
285 /// order the jobs by sched_time
286 bool operator<(const ScrubJob& rhs) const;
287 };
f67539c2 288 std::set<ScrubJob> sched_scrub_pg;
7c673cae 289
f67539c2
TL
290 /// @returns the scrub_reg_stamp used for unregistering the scrub job
291 utime_t reg_pg_scrub(spg_t pgid,
292 utime_t t,
293 double pool_scrub_min_interval,
294 double pool_scrub_max_interval,
295 bool must) {
296 ScrubJob scrub_job(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
297 must);
298 std::lock_guard l(OSDService::sched_scrub_lock);
299 sched_scrub_pg.insert(scrub_job);
300 return scrub_job.sched_time;
7c673cae 301 }
f67539c2 302
7c673cae 303 void unreg_pg_scrub(spg_t pgid, utime_t t) {
11fdf7f2 304 std::lock_guard l(sched_scrub_lock);
7c673cae 305 size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
11fdf7f2 306 ceph_assert(removed);
7c673cae 307 }
f67539c2 308
7c673cae 309 bool first_scrub_stamp(ScrubJob *out) {
11fdf7f2 310 std::lock_guard l(sched_scrub_lock);
7c673cae
FG
311 if (sched_scrub_pg.empty())
312 return false;
f67539c2 313 std::set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
7c673cae
FG
314 *out = *iter;
315 return true;
316 }
317 bool next_scrub_stamp(const ScrubJob& next,
318 ScrubJob *out) {
11fdf7f2 319 std::lock_guard l(sched_scrub_lock);
7c673cae
FG
320 if (sched_scrub_pg.empty())
321 return false;
f67539c2 322 std::set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next);
7c673cae
FG
323 if (iter == sched_scrub_pg.cend())
324 return false;
325 ++iter;
326 if (iter == sched_scrub_pg.cend())
327 return false;
328 *out = *iter;
329 return true;
330 }
331
f67539c2 332 void dumps_scrub(ceph::Formatter* f);
7c673cae 333
eafe8130
TL
334 bool can_inc_scrubs();
335 bool inc_scrubs_local();
336 void dec_scrubs_local();
337 bool inc_scrubs_remote();
338 void dec_scrubs_remote();
f67539c2 339 void dump_scrub_reservations(ceph::Formatter *f);
7c673cae
FG
340
341 void reply_op_error(OpRequestRef op, int err);
9f95a23c 342 void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv,
f67539c2 343 std::vector<pg_log_op_return_item_t> op_returns);
7c673cae
FG
344 void handle_misdirected_op(PG *pg, OpRequestRef op);
345
346
347private:
348 // -- agent shared state --
9f95a23c
TL
349 ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock");
350 ceph::condition_variable agent_cond;
f67539c2
TL
351 std::map<uint64_t, std::set<PGRef> > agent_queue;
352 std::set<PGRef>::iterator agent_queue_pos;
7c673cae
FG
353 bool agent_valid_iterator;
354 int agent_ops;
355 int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
f67539c2 356 std::set<hobject_t> agent_oids;
7c673cae
FG
357 bool agent_active;
358 struct AgentThread : public Thread {
359 OSDService *osd;
360 explicit AgentThread(OSDService *o) : osd(o) {}
361 void *entry() override {
362 osd->agent_entry();
363 return NULL;
364 }
365 } agent_thread;
366 bool agent_stop_flag;
9f95a23c 367 ceph::mutex agent_timer_lock = ceph::make_mutex("OSDService::agent_timer_lock");
7c673cae
FG
368 SafeTimer agent_timer;
369
370public:
371 void agent_entry();
372 void agent_stop();
373
374 void _enqueue(PG *pg, uint64_t priority) {
375 if (!agent_queue.empty() &&
376 agent_queue.rbegin()->first < priority)
377 agent_valid_iterator = false; // inserting higher-priority queue
f67539c2 378 std::set<PGRef>& nq = agent_queue[priority];
7c673cae 379 if (nq.empty())
9f95a23c 380 agent_cond.notify_all();
7c673cae
FG
381 nq.insert(pg);
382 }
383
384 void _dequeue(PG *pg, uint64_t old_priority) {
f67539c2
TL
385 std::set<PGRef>& oq = agent_queue[old_priority];
386 std::set<PGRef>::iterator p = oq.find(pg);
11fdf7f2 387 ceph_assert(p != oq.end());
7c673cae
FG
388 if (p == agent_queue_pos)
389 ++agent_queue_pos;
390 oq.erase(p);
391 if (oq.empty()) {
392 if (agent_queue.rbegin()->first == old_priority)
393 agent_valid_iterator = false;
394 agent_queue.erase(old_priority);
395 }
396 }
397
398 /// enable agent for a pg
399 void agent_enable_pg(PG *pg, uint64_t priority) {
11fdf7f2 400 std::lock_guard l(agent_lock);
7c673cae
FG
401 _enqueue(pg, priority);
402 }
403
404 /// adjust priority for an enagled pg
405 void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
11fdf7f2
TL
406 std::lock_guard l(agent_lock);
407 ceph_assert(new_priority != old_priority);
7c673cae
FG
408 _enqueue(pg, new_priority);
409 _dequeue(pg, old_priority);
410 }
411
412 /// disable agent for a pg
413 void agent_disable_pg(PG *pg, uint64_t old_priority) {
11fdf7f2 414 std::lock_guard l(agent_lock);
7c673cae
FG
415 _dequeue(pg, old_priority);
416 }
417
418 /// note start of an async (evict) op
419 void agent_start_evict_op() {
11fdf7f2 420 std::lock_guard l(agent_lock);
7c673cae
FG
421 ++agent_ops;
422 }
423
424 /// note finish or cancellation of an async (evict) op
425 void agent_finish_evict_op() {
11fdf7f2
TL
426 std::lock_guard l(agent_lock);
427 ceph_assert(agent_ops > 0);
7c673cae 428 --agent_ops;
9f95a23c 429 agent_cond.notify_all();
7c673cae
FG
430 }
431
432 /// note start of an async (flush) op
433 void agent_start_op(const hobject_t& oid) {
11fdf7f2 434 std::lock_guard l(agent_lock);
7c673cae 435 ++agent_ops;
11fdf7f2 436 ceph_assert(agent_oids.count(oid) == 0);
7c673cae
FG
437 agent_oids.insert(oid);
438 }
439
440 /// note finish or cancellation of an async (flush) op
441 void agent_finish_op(const hobject_t& oid) {
11fdf7f2
TL
442 std::lock_guard l(agent_lock);
443 ceph_assert(agent_ops > 0);
7c673cae 444 --agent_ops;
11fdf7f2 445 ceph_assert(agent_oids.count(oid) == 1);
7c673cae 446 agent_oids.erase(oid);
9f95a23c 447 agent_cond.notify_all();
7c673cae
FG
448 }
449
450 /// check if we are operating on an object
451 bool agent_is_active_oid(const hobject_t& oid) {
11fdf7f2 452 std::lock_guard l(agent_lock);
7c673cae
FG
453 return agent_oids.count(oid);
454 }
455
456 /// get count of active agent ops
457 int agent_get_num_ops() {
11fdf7f2 458 std::lock_guard l(agent_lock);
7c673cae
FG
459 return agent_ops;
460 }
461
462 void agent_inc_high_count() {
11fdf7f2 463 std::lock_guard l(agent_lock);
7c673cae
FG
464 flush_mode_high_count ++;
465 }
466
467 void agent_dec_high_count() {
11fdf7f2 468 std::lock_guard l(agent_lock);
7c673cae
FG
469 flush_mode_high_count --;
470 }
471
472private:
473 /// throttle promotion attempts
11fdf7f2 474 std::atomic<unsigned int> promote_probability_millis{1000}; ///< probability thousands. one word.
7c673cae
FG
475 PromoteCounter promote_counter;
476 utime_t last_recalibrate;
477 unsigned long promote_max_objects, promote_max_bytes;
478
479public:
480 bool promote_throttle() {
481 // NOTE: lockless! we rely on the probability being a single word.
482 promote_counter.attempt();
483 if ((unsigned)rand() % 1000 > promote_probability_millis)
484 return true; // yes throttle (no promote)
485 if (promote_max_objects &&
486 promote_counter.objects > promote_max_objects)
487 return true; // yes throttle
488 if (promote_max_bytes &&
489 promote_counter.bytes > promote_max_bytes)
490 return true; // yes throttle
491 return false; // no throttle (promote)
492 }
493 void promote_finish(uint64_t bytes) {
494 promote_counter.finish(bytes);
495 }
496 void promote_throttle_recalibrate();
9f95a23c
TL
497 unsigned get_num_shards() const {
498 return m_objecter_finishers;
499 }
500 Finisher* get_objecter_finisher(int shard) {
501 return objecter_finishers[shard].get();
502 }
7c673cae
FG
503
504 // -- Objecter, for tiering reads/writes from/to other OSDs --
f67539c2 505 ceph::async::io_context_pool& poolctx;
9f95a23c 506 std::unique_ptr<Objecter> objecter;
11fdf7f2 507 int m_objecter_finishers;
9f95a23c 508 std::vector<std::unique_ptr<Finisher>> objecter_finishers;
7c673cae
FG
509
510 // -- Watch --
9f95a23c 511 ceph::mutex watch_lock = ceph::make_mutex("OSDService::watch_lock");
7c673cae
FG
512 SafeTimer watch_timer;
513 uint64_t next_notif_id;
514 uint64_t get_next_id(epoch_t cur_epoch) {
11fdf7f2 515 std::lock_guard l(watch_lock);
7c673cae
FG
516 return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
517 }
518
519 // -- Recovery/Backfill Request Scheduling --
9f95a23c 520 ceph::mutex recovery_request_lock = ceph::make_mutex("OSDService::recovery_request_lock");
7c673cae
FG
521 SafeTimer recovery_request_timer;
522
31f18b77
FG
523 // For async recovery sleep
524 bool recovery_needs_sleep = true;
9f95a23c 525 ceph::real_clock::time_point recovery_schedule_time;
31f18b77 526
11fdf7f2 527 // For recovery & scrub & snap
9f95a23c 528 ceph::mutex sleep_lock = ceph::make_mutex("OSDService::sleep_lock");
11fdf7f2 529 SafeTimer sleep_timer;
31f18b77 530
7c673cae
FG
531 // -- tids --
532 // for ops i issue
11fdf7f2 533 std::atomic<unsigned int> last_tid{0};
7c673cae
FG
534 ceph_tid_t get_tid() {
535 return (ceph_tid_t)last_tid++;
536 }
537
538 // -- backfill_reservation --
539 Finisher reserver_finisher;
f67539c2
TL
540 AsyncReserver<spg_t, Finisher> local_reserver;
541 AsyncReserver<spg_t, Finisher> remote_reserver;
7c673cae 542
11fdf7f2 543 // -- pg merge --
9f95a23c 544 ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
f67539c2
TL
545 std::map<pg_t,eversion_t> ready_to_merge_source; // pg -> version
546 std::map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target; // pg -> (version,les,lec)
547 std::set<pg_t> not_ready_to_merge_source;
548 std::map<pg_t,pg_t> not_ready_to_merge_target;
549 std::set<pg_t> sent_ready_to_merge_source;
11fdf7f2
TL
550
551 void set_ready_to_merge_source(PG *pg,
552 eversion_t version);
553 void set_ready_to_merge_target(PG *pg,
554 eversion_t version,
555 epoch_t last_epoch_started,
556 epoch_t last_epoch_clean);
557 void set_not_ready_to_merge_source(pg_t source);
558 void set_not_ready_to_merge_target(pg_t target, pg_t source);
559 void clear_ready_to_merge(PG *pg);
560 void send_ready_to_merge();
561 void _send_ready_to_merge();
562 void clear_sent_ready_to_merge();
9f95a23c 563 void prune_sent_ready_to_merge(const OSDMapRef& osdmap);
11fdf7f2 564
7c673cae
FG
565 // -- pg_temp --
566private:
9f95a23c 567 ceph::mutex pg_temp_lock = ceph::make_mutex("OSDService::pg_temp_lock");
94b18763 568 struct pg_temp_t {
f67539c2 569 std::vector<int> acting;
94b18763
FG
570 bool forced = false;
571 };
f67539c2
TL
572 std::map<pg_t, pg_temp_t> pg_temp_wanted;
573 std::map<pg_t, pg_temp_t> pg_temp_pending;
7c673cae 574 void _sent_pg_temp();
94b18763 575 friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
7c673cae 576public:
f67539c2 577 void queue_want_pg_temp(pg_t pgid, const std::vector<int>& want,
94b18763 578 bool forced = false);
7c673cae
FG
579 void remove_want_pg_temp(pg_t pgid);
580 void requeue_pg_temp();
581 void send_pg_temp();
582
11fdf7f2 583 ceph::mutex pg_created_lock = ceph::make_mutex("OSDService::pg_created_lock");
f67539c2 584 std::set<pg_t> pg_created;
7c673cae 585 void send_pg_created(pg_t pgid);
11fdf7f2
TL
586 void prune_pg_created();
587 void send_pg_created();
31f18b77 588
f67539c2 589 AsyncReserver<spg_t, Finisher> snap_reserver;
11fdf7f2 590 void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c);
7c673cae 591 void queue_for_snap_trim(PG *pg);
f67539c2
TL
592 void queue_for_scrub(PG* pg, Scrub::scrub_prio_t with_priority);
593 void queue_scrub_after_repair(PG* pg, Scrub::scrub_prio_t with_priority);
594
595 /// queue the message (-> event) that all replicas reserved scrub resources for us
596 void queue_for_scrub_granted(PG* pg, Scrub::scrub_prio_t with_priority);
597
598 /// queue the message (-> event) that some replicas denied our scrub resources request
599 void queue_for_scrub_denied(PG* pg, Scrub::scrub_prio_t with_priority);
600
601 /// Signals either (a) the end of a sleep period, or (b) a recheck of the availability
602 /// of the primary map being created by the backend.
603 void queue_for_scrub_resched(PG* pg, Scrub::scrub_prio_t with_priority);
604
605 /// Signals a change in the number of in-flight recovery writes
606 void queue_scrub_pushes_update(PG* pg, Scrub::scrub_prio_t with_priority);
607
608 /// Signals that all pending updates were applied
609 void queue_scrub_applied_update(PG* pg, Scrub::scrub_prio_t with_priority);
610
611 /// The block-range that was locked and prevented the scrubbing - is freed
612 void queue_scrub_unblocking(PG* pg, Scrub::scrub_prio_t with_priority);
613
614 /// Signals that all write OPs are done
615 void queue_scrub_digest_update(PG* pg, Scrub::scrub_prio_t with_priority);
616
617 /// Signals that we (the Primary) got all waited-for scrub-maps from our replicas
618 void queue_scrub_got_repl_maps(PG* pg, Scrub::scrub_prio_t with_priority);
619
620 void queue_for_rep_scrub(PG* pg,
621 Scrub::scrub_prio_t with_high_priority,
622 unsigned int qu_priority);
623
624 /// Signals a change in the number of in-flight recovery writes
625 void queue_scrub_replica_pushes(PG *pg, Scrub::scrub_prio_t with_priority);
626
627 void queue_for_rep_scrub_resched(PG* pg,
628 Scrub::scrub_prio_t with_high_priority,
629 unsigned int qu_priority);
630
11fdf7f2
TL
631 void queue_for_pg_delete(spg_t pgid, epoch_t e);
632 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
7c673cae
FG
633
634private:
635 // -- pg recovery and associated throttling --
9f95a23c 636 ceph::mutex recovery_lock = ceph::make_mutex("OSDService::recovery_lock");
f67539c2
TL
637 std::list<std::pair<epoch_t, PGRef> > awaiting_throttle;
638
639 /// queue a scrub-related message for a PG
640 template <class MSG_TYPE>
641 void queue_scrub_event_msg(PG* pg,
642 Scrub::scrub_prio_t with_priority,
643 unsigned int qu_priority);
644
645 /// An alternative version of queue_scrub_event_msg(), in which the queuing priority is
646 /// provided by the executing scrub (i.e. taken from PgScrubber::m_flags)
647 template <class MSG_TYPE>
648 void queue_scrub_event_msg(PG* pg, Scrub::scrub_prio_t with_priority);
7c673cae
FG
649
650 utime_t defer_recovery_until;
651 uint64_t recovery_ops_active;
652 uint64_t recovery_ops_reserved;
653 bool recovery_paused;
654#ifdef DEBUG_RECOVERY_OIDS
f67539c2 655 std::map<spg_t, std::set<hobject_t> > recovery_oids;
7c673cae
FG
656#endif
657 bool _recover_now(uint64_t *available_pushes);
658 void _maybe_queue_recovery();
659 void _queue_for_recovery(
f67539c2 660 std::pair<epoch_t, PGRef> p, uint64_t reserved_pushes);
7c673cae
FG
661public:
662 void start_recovery_op(PG *pg, const hobject_t& soid);
663 void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
664 bool is_recovery_active();
11fdf7f2 665 void release_reserved_pushes(uint64_t pushes);
7c673cae
FG
666 void defer_recovery(float defer_for) {
667 defer_recovery_until = ceph_clock_now();
668 defer_recovery_until += defer_for;
669 }
670 void pause_recovery() {
11fdf7f2 671 std::lock_guard l(recovery_lock);
7c673cae
FG
672 recovery_paused = true;
673 }
674 bool recovery_is_paused() {
11fdf7f2 675 std::lock_guard l(recovery_lock);
7c673cae
FG
676 return recovery_paused;
677 }
678 void unpause_recovery() {
11fdf7f2 679 std::lock_guard l(recovery_lock);
7c673cae
FG
680 recovery_paused = false;
681 _maybe_queue_recovery();
682 }
683 void kick_recovery_queue() {
11fdf7f2 684 std::lock_guard l(recovery_lock);
7c673cae
FG
685 _maybe_queue_recovery();
686 }
687 void clear_queued_recovery(PG *pg) {
11fdf7f2
TL
688 std::lock_guard l(recovery_lock);
689 awaiting_throttle.remove_if(
690 [pg](decltype(awaiting_throttle)::const_reference awaiting ) {
691 return awaiting.second.get() == pg;
692 });
7c673cae 693 }
9f95a23c
TL
694
695 unsigned get_target_pg_log_entries() const;
f67539c2 696
7c673cae 697 // delayed pg activation
c07f9fc5 698 void queue_for_recovery(PG *pg) {
11fdf7f2 699 std::lock_guard l(recovery_lock);
c07f9fc5 700
11fdf7f2 701 if (pg->is_forced_recovery_or_backfill()) {
f67539c2 702 awaiting_throttle.push_front(std::make_pair(pg->get_osdmap()->get_epoch(), pg));
7c673cae 703 } else {
f67539c2 704 awaiting_throttle.push_back(std::make_pair(pg->get_osdmap()->get_epoch(), pg));
7c673cae
FG
705 }
706 _maybe_queue_recovery();
707 }
31f18b77 708 void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
11fdf7f2 709 std::lock_guard l(recovery_lock);
f67539c2 710 _queue_for_recovery(std::make_pair(queued, pg), reserved_pushes);
31f18b77 711 }
7c673cae 712
9f95a23c
TL
713 void queue_check_readable(spg_t spgid,
714 epoch_t lpr,
715 ceph::signedspan delay = ceph::signedspan::zero());
716
7c673cae 717 // osd map cache (past osd maps)
9f95a23c 718 ceph::mutex map_cache_lock = ceph::make_mutex("OSDService::map_cache_lock");
7c673cae 719 SharedLRU<epoch_t, const OSDMap> map_cache;
f67539c2
TL
720 SimpleLRU<epoch_t, ceph::buffer::list> map_bl_cache;
721 SimpleLRU<epoch_t, ceph::buffer::list> map_bl_inc_cache;
7c673cae
FG
722
723 OSDMapRef try_get_map(epoch_t e);
724 OSDMapRef get_map(epoch_t e) {
725 OSDMapRef ret(try_get_map(e));
11fdf7f2 726 ceph_assert(ret);
7c673cae
FG
727 return ret;
728 }
729 OSDMapRef add_map(OSDMap *o) {
11fdf7f2 730 std::lock_guard l(map_cache_lock);
7c673cae
FG
731 return _add_map(o);
732 }
733 OSDMapRef _add_map(OSDMap *o);
734
f67539c2
TL
735 void _add_map_bl(epoch_t e, ceph::buffer::list& bl);
736 bool get_map_bl(epoch_t e, ceph::buffer::list& bl) {
11fdf7f2 737 std::lock_guard l(map_cache_lock);
7c673cae
FG
738 return _get_map_bl(e, bl);
739 }
f67539c2 740 bool _get_map_bl(epoch_t e, ceph::buffer::list& bl);
7c673cae 741
f67539c2
TL
742 void _add_map_inc_bl(epoch_t e, ceph::buffer::list& bl);
743 bool get_inc_map_bl(epoch_t e, ceph::buffer::list& bl);
7c673cae 744
11fdf7f2
TL
745 /// identify split child pgids over a osdmap interval
746 void identify_splits_and_merges(
747 OSDMapRef old_map,
748 OSDMapRef new_map,
749 spg_t pgid,
f67539c2
TL
750 std::set<std::pair<spg_t,epoch_t>> *new_children,
751 std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
11fdf7f2
TL
752
753 void need_heartbeat_peer_update();
7c673cae
FG
754
755 void init();
f67539c2 756 void final_init();
7c673cae 757 void start_shutdown();
31f18b77 758 void shutdown_reserver();
7c673cae
FG
759 void shutdown();
760
7c673cae 761 // -- stats --
9f95a23c 762 ceph::mutex stat_lock = ceph::make_mutex("OSDService::stat_lock");
7c673cae 763 osd_stat_t osd_stat;
31f18b77 764 uint32_t seq = 0;
7c673cae 765
11fdf7f2
TL
766 void set_statfs(const struct store_statfs_t &stbuf,
767 osd_alert_list_t& alerts);
f67539c2 768 osd_stat_t set_osd_stat(std::vector<int>& hb_peers, int num_pgs);
11fdf7f2
TL
769 void inc_osd_stat_repaired(void);
770 float compute_adjusted_ratio(osd_stat_t new_stat, float *pratio, uint64_t adjust_used = 0);
7c673cae 771 osd_stat_t get_osd_stat() {
11fdf7f2 772 std::lock_guard l(stat_lock);
31f18b77
FG
773 ++seq;
774 osd_stat.up_from = up_epoch;
775 osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
7c673cae
FG
776 return osd_stat;
777 }
31f18b77 778 uint64_t get_osd_stat_seq() {
11fdf7f2 779 std::lock_guard l(stat_lock);
31f18b77
FG
780 return osd_stat.seq;
781 }
f67539c2 782 void get_hb_pingtime(std::map<int, osd_stat_t::Interfaces> *pp)
eafe8130
TL
783 {
784 std::lock_guard l(stat_lock);
785 *pp = osd_stat.hb_pingtime;
786 return;
787 }
7c673cae
FG
788
789 // -- OSD Full Status --
790private:
791 friend TestOpsSocketHook;
9f95a23c 792 mutable ceph::mutex full_status_lock = ceph::make_mutex("OSDService::full_status_lock");
7c673cae
FG
793 enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
794 const char *get_full_state_name(s_names s) const {
795 switch (s) {
796 case NONE: return "none";
797 case NEARFULL: return "nearfull";
798 case BACKFILLFULL: return "backfillfull";
799 case FULL: return "full";
800 case FAILSAFE: return "failsafe";
801 default: return "???";
802 }
803 }
f67539c2 804 s_names get_full_state(std::string type) const {
7c673cae
FG
805 if (type == "none")
806 return NONE;
807 else if (type == "failsafe")
808 return FAILSAFE;
809 else if (type == "full")
810 return FULL;
811 else if (type == "backfillfull")
812 return BACKFILLFULL;
813 else if (type == "nearfull")
814 return NEARFULL;
815 else
816 return INVALID;
817 }
11fdf7f2 818 double cur_ratio, physical_ratio; ///< current utilization
7c673cae
FG
819 mutable int64_t injectfull = 0;
820 s_names injectfull_state = NONE;
821 float get_failsafe_full_ratio();
11fdf7f2
TL
822 bool _check_inject_full(DoutPrefixProvider *dpp, s_names type) const;
823 bool _check_full(DoutPrefixProvider *dpp, s_names type) const;
7c673cae 824public:
11fdf7f2 825 void check_full_status(float ratio, float pratio);
f67539c2 826 s_names recalc_full_state(float ratio, float pratio, std::string &inject);
11fdf7f2
TL
827 bool _tentative_full(DoutPrefixProvider *dpp, s_names type, uint64_t adjust_used, osd_stat_t);
828 bool check_failsafe_full(DoutPrefixProvider *dpp) const;
829 bool check_full(DoutPrefixProvider *dpp) const;
830 bool tentative_backfill_full(DoutPrefixProvider *dpp, uint64_t adjust_used, osd_stat_t);
831 bool check_backfill_full(DoutPrefixProvider *dpp) const;
832 bool check_nearfull(DoutPrefixProvider *dpp) const;
7c673cae
FG
833 bool is_failsafe_full() const;
834 bool is_full() const;
835 bool is_backfillfull() const;
836 bool is_nearfull() const;
837 bool need_fullness_update(); ///< osdmap state needs update
838 void set_injectfull(s_names type, int64_t count);
7c673cae
FG
839
840
841 // -- epochs --
842private:
9f95a23c
TL
843 // protects access to boot_epoch, up_epoch, bind_epoch
844 mutable ceph::mutex epoch_lock = ceph::make_mutex("OSDService::epoch_lock");
7c673cae
FG
845 epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
846 epoch_t up_epoch; // _most_recent_ epoch we were marked up
847 epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
848public:
849 /**
f67539c2 850 * Retrieve the boot_, up_, and bind_ epochs the OSD has std::set. The params
7c673cae
FG
851 * can be NULL if you don't care about them.
852 */
853 void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
854 epoch_t *_bind_epoch) const;
855 /**
f67539c2 856 * Std::set the boot, up, and bind epochs. Any NULL params will not be std::set.
7c673cae
FG
857 */
858 void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
859 const epoch_t *_bind_epoch);
860 epoch_t get_boot_epoch() const {
861 epoch_t ret;
862 retrieve_epochs(&ret, NULL, NULL);
863 return ret;
864 }
865 epoch_t get_up_epoch() const {
866 epoch_t ret;
867 retrieve_epochs(NULL, &ret, NULL);
868 return ret;
869 }
870 epoch_t get_bind_epoch() const {
871 epoch_t ret;
872 retrieve_epochs(NULL, NULL, &ret);
873 return ret;
874 }
875
181888fb
FG
876 void request_osdmap_update(epoch_t e);
877
9f95a23c
TL
878 // -- heartbeats --
879 ceph::mutex hb_stamp_lock = ceph::make_mutex("OSDServce::hb_stamp_lock");
880
881 /// osd -> heartbeat stamps
f67539c2 882 std::vector<HeartbeatStampsRef> hb_stamps;
9f95a23c
TL
883
884 /// get or create a ref for a peer's HeartbeatStamps
885 HeartbeatStampsRef get_hb_stamps(unsigned osd);
886
887
888 // Timer for readable leases
889 ceph::timer<ceph::mono_clock> mono_timer = ceph::timer<ceph::mono_clock>{ceph::construct_suspended};
890
891 void queue_renew_lease(epoch_t epoch, spg_t spgid);
892
7c673cae 893 // -- stopping --
9f95a23c
TL
894 ceph::mutex is_stopping_lock = ceph::make_mutex("OSDService::is_stopping_lock");
895 ceph::condition_variable is_stopping_cond;
7c673cae
FG
896 enum {
897 NOT_STOPPING,
898 PREPARING_TO_STOP,
899 STOPPING };
11fdf7f2
TL
900 std::atomic<int> state{NOT_STOPPING};
901 int get_state() const {
7c673cae
FG
902 return state;
903 }
904 void set_state(int s) {
905 state = s;
906 }
907 bool is_stopping() const {
908 return state == STOPPING;
909 }
910 bool is_preparing_to_stop() const {
911 return state == PREPARING_TO_STOP;
912 }
913 bool prepare_to_stop();
914 void got_stop_ack();
915
916
917#ifdef PG_DEBUG_REFS
9f95a23c 918 ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock");
f67539c2
TL
919 std::map<spg_t, int> pgid_tracker;
920 std::map<spg_t, PG*> live_pgs;
31f18b77
FG
921 void add_pgid(spg_t pgid, PG *pg);
922 void remove_pgid(spg_t pgid, PG *pg);
923 void dump_live_pgids();
7c673cae
FG
924#endif
925
f67539c2 926 explicit OSDService(OSD *osd, ceph::async::io_context_pool& poolctx);
9f95a23c 927 ~OSDService() = default;
7c673cae
FG
928};
929
11fdf7f2
TL
930/*
931
932 Each PG slot includes queues for events that are processing and/or waiting
933 for a PG to be materialized in the slot.
934
935 These are the constraints:
936
f67539c2 937 - client ops must remained ordered by client, regardless of std::map epoch
11fdf7f2
TL
938 - peering messages/events from peers must remain ordered by peer
939 - peering messages and client ops need not be ordered relative to each other
940
941 - some peering events can create a pg (e.g., notify)
942 - the query peering event can proceed when a PG doesn't exist
943
944 Implementation notes:
945
946 - everybody waits for split. If the OSD has the parent PG it will instantiate
947 the PGSlot early and mark it waiting_for_split. Everything will wait until
948 the parent is able to commit the split operation and the child PG's are
949 materialized in the child slots.
950
951 - every event has an epoch property and will wait for the OSDShard to catch
952 up to that epoch. For example, if we get a peering event from a future
953 epoch, the event will wait in the slot until the local OSD has caught up.
954 (We should be judicious in specifying the required epoch [by, e.g., setting
955 it to the same_interval_since epoch] so that we don't wait for epochs that
956 don't affect the given PG.)
957
958 - we maintain two separate wait lists, *waiting* and *waiting_peering*. The
9f95a23c 959 OpSchedulerItem has an is_peering() bool to determine which we use. Waiting
11fdf7f2
TL
960 peering events are queued up by epoch required.
961
962 - when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or
963 materialized the PG), we wake *all* waiting items. (This could be optimized,
964 probably, but we don't bother.) We always requeue peering items ahead of
965 client ops.
966
967 - some peering events are marked !peering_requires_pg (PGQuery). if we do
968 not have a PG these are processed immediately (under the shard lock).
969
970 - we do not have a PG present, we check if the slot maps to the current host.
971 if so, we either queue the item and wait for the PG to materialize, or
972 (if the event is a pg creating event like PGNotify), we materialize the PG.
973
974 - when we advance the osdmap on the OSDShard, we scan pg slots and
975 discard any slots with no pg (and not waiting_for_split) that no
f67539c2 976 longer std::map to the current host.
11fdf7f2
TL
977
978 */
979
980struct OSDShardPGSlot {
9f95a23c 981 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
11fdf7f2 982 PGRef pg; ///< pg reference
f67539c2 983 std::deque<OpSchedulerItem> to_process; ///< order items for this slot
11fdf7f2
TL
984 int num_running = 0; ///< _process threads doing pg lookup/lock
985
f67539c2 986 std::deque<OpSchedulerItem> waiting; ///< waiting for pg (or map + pg)
11fdf7f2
TL
987
988 /// waiting for map (peering evt)
f67539c2 989 std::map<epoch_t,std::deque<OpSchedulerItem>> waiting_peering;
11fdf7f2
TL
990
991 /// incremented by wake_pg_waiters; indicates racing _process threads
992 /// should bail out (their op has been requeued)
993 uint64_t requeue_seq = 0;
994
995 /// waiting for split child to materialize in these epoch(s)
f67539c2 996 std::set<epoch_t> waiting_for_split;
11fdf7f2
TL
997
998 epoch_t epoch = 0;
999 boost::intrusive::set_member_hook<> pg_epoch_item;
1000
1001 /// waiting for a merge (source or target) by this epoch
1002 epoch_t waiting_for_merge_epoch = 0;
1003};
1004
1005struct OSDShard {
1006 const unsigned shard_id;
1007 CephContext *cct;
1008 OSD *osd;
1009
f67539c2 1010 std::string shard_name;
11fdf7f2 1011
f67539c2 1012 std::string sdata_wait_lock_name;
11fdf7f2
TL
1013 ceph::mutex sdata_wait_lock;
1014 ceph::condition_variable sdata_cond;
f67539c2 1015 int waiting_threads = 0;
11fdf7f2 1016
11fdf7f2
TL
1017 ceph::mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock
1018 OSDMapRef shard_osdmap;
1019
1020 OSDMapRef get_osdmap() {
1021 std::lock_guard l(osdmap_lock);
1022 return shard_osdmap;
1023 }
1024
f67539c2 1025 std::string shard_lock_name;
11fdf7f2
TL
1026 ceph::mutex shard_lock; ///< protects remaining members below
1027
1028 /// map of slots for each spg_t. maintains ordering of items dequeued
9f95a23c 1029 /// from scheduler while _process thread drops shard lock to acquire the
11fdf7f2 1030 /// pg lock. stale slots are removed by consume_map.
f67539c2 1031 std::unordered_map<spg_t,std::unique_ptr<OSDShardPGSlot>> pg_slots;
11fdf7f2
TL
1032
1033 struct pg_slot_compare_by_epoch {
1034 bool operator()(const OSDShardPGSlot& l, const OSDShardPGSlot& r) const {
1035 return l.epoch < r.epoch;
1036 }
1037 };
1038
1039 /// maintain an ordering of pg slots by pg epoch
1040 boost::intrusive::multiset<
1041 OSDShardPGSlot,
1042 boost::intrusive::member_hook<
1043 OSDShardPGSlot,
1044 boost::intrusive::set_member_hook<>,
1045 &OSDShardPGSlot::pg_epoch_item>,
1046 boost::intrusive::compare<pg_slot_compare_by_epoch>> pg_slots_by_epoch;
1047 int waiting_for_min_pg_epoch = 0;
1048 ceph::condition_variable min_pg_epoch_cond;
1049
1050 /// priority queue
9f95a23c 1051 ceph::osd::scheduler::OpSchedulerRef scheduler;
11fdf7f2
TL
1052
1053 bool stop_waiting = false;
1054
1055 ContextQueue context_queue;
1056
11fdf7f2
TL
1057 void _attach_pg(OSDShardPGSlot *slot, PG *pg);
1058 void _detach_pg(OSDShardPGSlot *slot);
1059
1060 void update_pg_epoch(OSDShardPGSlot *slot, epoch_t epoch);
1061 epoch_t get_min_pg_epoch();
1062 void wait_min_pg_epoch(epoch_t need);
1063
1064 /// return newest epoch we are waiting for
1065 epoch_t get_max_waiting_epoch();
1066
1067 /// push osdmap into shard
1068 void consume_map(
9f95a23c 1069 const OSDMapRef& osdmap,
11fdf7f2
TL
1070 unsigned *pushes_to_free);
1071
1072 void _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
1073
1074 void identify_splits_and_merges(
1075 const OSDMapRef& as_of_osdmap,
f67539c2
TL
1076 std::set<std::pair<spg_t,epoch_t>> *split_children,
1077 std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
1078 void _prime_splits(std::set<std::pair<spg_t,epoch_t>> *pgids);
11fdf7f2 1079 void prime_splits(const OSDMapRef& as_of_osdmap,
f67539c2 1080 std::set<std::pair<spg_t,epoch_t>> *pgids);
11fdf7f2 1081 void prime_merges(const OSDMapRef& as_of_osdmap,
f67539c2 1082 std::set<std::pair<spg_t,epoch_t>> *merge_pgs);
11fdf7f2
TL
1083 void register_and_wake_split_child(PG *pg);
1084 void unprime_split_children(spg_t parent, unsigned old_pg_num);
1085
1086 OSDShard(
1087 int id,
1088 CephContext *cct,
9f95a23c 1089 OSD *osd);
11fdf7f2
TL
1090};
1091
7c673cae
FG
1092class OSD : public Dispatcher,
1093 public md_config_obs_t {
9f95a23c
TL
1094 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
1095
7c673cae 1096 /** OSD **/
9f95a23c
TL
1097 // global lock
1098 ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock");
7c673cae
FG
1099 SafeTimer tick_timer; // safe timer (osd_lock)
1100
1101 // Tick timer for those stuff that do not need osd_lock
9f95a23c 1102 ceph::mutex tick_timer_lock = ceph::make_mutex("OSD::tick_timer_lock");
7c673cae 1103 SafeTimer tick_timer_without_osd_lock;
11fdf7f2
TL
1104 std::string gss_ktfile_client{};
1105
7c673cae
FG
1106public:
1107 // config observer bits
1108 const char** get_tracked_conf_keys() const override;
11fdf7f2 1109 void handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
1110 const std::set <std::string> &changed) override;
1111 void update_log_config();
1112 void check_config();
1113
1114protected:
1115
91327a77
AA
1116 const double OSD_TICK_INTERVAL = { 1.0 };
1117 double get_tick_interval() const;
7c673cae 1118
7c673cae
FG
1119 Messenger *cluster_messenger;
1120 Messenger *client_messenger;
1121 Messenger *objecter_messenger;
1122 MonClient *monc; // check the "monc helpers" list before accessing directly
1123 MgrClient mgrc;
1124 PerfCounters *logger;
1125 PerfCounters *recoverystate_perf;
1126 ObjectStore *store;
1127#ifdef HAVE_LIBFUSE
1128 FuseStore *fuse_store = nullptr;
1129#endif
1130 LogClient log_client;
1131 LogChannelRef clog;
1132
1133 int whoami;
1134 std::string dev_path, journal_path;
1135
9f95a23c 1136 ceph_release_t last_require_osd_release{ceph_release_t::unknown};
11fdf7f2
TL
1137
1138 int numa_node = -1;
1139 size_t numa_cpu_set_size = 0;
1140 cpu_set_t numa_cpu_set;
1141
31f18b77 1142 bool store_is_rotational = true;
d2e6a577 1143 bool journal_is_rotational = true;
31f18b77 1144
7c673cae 1145 ZTracer::Endpoint trace_endpoint;
f67539c2
TL
1146 PerfCounters* create_logger();
1147 PerfCounters* create_recoverystate_perf();
7c673cae
FG
1148 void tick();
1149 void tick_without_osd_lock();
1150 void _dispatch(Message *m);
1151 void dispatch_op(OpRequestRef op);
1152
11fdf7f2 1153 void check_osdmap_features();
7c673cae
FG
1154
1155 // asok
1156 friend class OSDSocketHook;
1157 class OSDSocketHook *asok_hook;
9f95a23c
TL
1158 void asok_command(
1159 std::string_view prefix,
1160 const cmdmap_t& cmdmap,
f67539c2
TL
1161 ceph::Formatter *f,
1162 const ceph::buffer::list& inbl,
1163 std::function<void(int,const std::string&,ceph::buffer::list&)> on_finish);
7c673cae
FG
1164
1165public:
7c673cae 1166 int get_nodeid() { return whoami; }
f67539c2 1167
7c673cae
FG
1168 static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
1169 char foo[20];
1170 snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
1171 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1172 }
1173 static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
1174 char foo[22];
1175 snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch);
1176 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1177 }
1178
1179 static ghobject_t make_snapmapper_oid() {
1180 return ghobject_t(hobject_t(
1181 sobject_t(
1182 object_t("snapmapper"),
1183 0)));
1184 }
9f95a23c
TL
1185 static ghobject_t make_purged_snaps_oid() {
1186 return ghobject_t(hobject_t(
1187 sobject_t(
1188 object_t("purged_snaps"),
1189 0)));
1190 }
7c673cae
FG
1191
1192 static ghobject_t make_pg_log_oid(spg_t pg) {
f67539c2 1193 std::stringstream ss;
7c673cae 1194 ss << "pglog_" << pg;
f67539c2 1195 std::string s;
7c673cae
FG
1196 getline(ss, s);
1197 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1198 }
f67539c2 1199
7c673cae 1200 static ghobject_t make_pg_biginfo_oid(spg_t pg) {
f67539c2 1201 std::stringstream ss;
7c673cae 1202 ss << "pginfo_" << pg;
f67539c2 1203 std::string s;
7c673cae
FG
1204 getline(ss, s);
1205 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1206 }
1207 static ghobject_t make_infos_oid() {
1208 hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
1209 return ghobject_t(oid);
1210 }
11fdf7f2
TL
1211
1212 static ghobject_t make_final_pool_info_oid(int64_t pool) {
1213 return ghobject_t(
1214 hobject_t(
1215 sobject_t(
f67539c2 1216 object_t(std::string("final_pool_") + stringify(pool)),
11fdf7f2
TL
1217 CEPH_NOSNAP)));
1218 }
1219
1220 static ghobject_t make_pg_num_history_oid() {
1221 return ghobject_t(hobject_t(sobject_t("pg_num_history", CEPH_NOSNAP)));
1222 }
1223
7c673cae
FG
1224 static void recursive_remove_collection(CephContext* cct,
1225 ObjectStore *store,
1226 spg_t pgid,
1227 coll_t tmp);
1228
1229 /**
1230 * get_osd_initial_compat_set()
1231 *
f67539c2 1232 * Get the initial feature std::set for this OSD. Features
7c673cae
FG
1233 * here are automatically upgraded.
1234 *
1235 * Return value: Initial osd CompatSet
1236 */
1237 static CompatSet get_osd_initial_compat_set();
1238
1239 /**
1240 * get_osd_compat_set()
1241 *
1242 * Get all features supported by this OSD
1243 *
1244 * Return value: CompatSet of all supported features
1245 */
1246 static CompatSet get_osd_compat_set();
f67539c2 1247
7c673cae
FG
1248
1249private:
1250 class C_Tick;
1251 class C_Tick_WithoutOSDLock;
1252
11fdf7f2
TL
1253 // -- config settings --
1254 float m_osd_pg_epoch_max_lag_factor;
1255
7c673cae
FG
1256 // -- superblock --
1257 OSDSuperblock superblock;
1258
1259 void write_superblock();
1260 void write_superblock(ObjectStore::Transaction& t);
1261 int read_superblock();
1262
1263 void clear_temp_objects();
1264
1265 CompatSet osd_compat;
1266
1267 // -- state --
1268public:
1269 typedef enum {
1270 STATE_INITIALIZING = 1,
1271 STATE_PREBOOT,
1272 STATE_BOOTING,
1273 STATE_ACTIVE,
1274 STATE_STOPPING,
1275 STATE_WAITING_FOR_HEALTHY
1276 } osd_state_t;
1277
1278 static const char *get_state_name(int s) {
1279 switch (s) {
1280 case STATE_INITIALIZING: return "initializing";
1281 case STATE_PREBOOT: return "preboot";
1282 case STATE_BOOTING: return "booting";
1283 case STATE_ACTIVE: return "active";
1284 case STATE_STOPPING: return "stopping";
1285 case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy";
1286 default: return "???";
1287 }
1288 }
1289
1290private:
11fdf7f2 1291 std::atomic<int> state{STATE_INITIALIZING};
7c673cae
FG
1292
1293public:
1294 int get_state() const {
1295 return state;
1296 }
1297 void set_state(int s) {
1298 state = s;
1299 }
1300 bool is_initializing() const {
1301 return state == STATE_INITIALIZING;
1302 }
1303 bool is_preboot() const {
1304 return state == STATE_PREBOOT;
1305 }
1306 bool is_booting() const {
1307 return state == STATE_BOOTING;
1308 }
1309 bool is_active() const {
1310 return state == STATE_ACTIVE;
1311 }
1312 bool is_stopping() const {
1313 return state == STATE_STOPPING;
1314 }
1315 bool is_waiting_for_healthy() const {
1316 return state == STATE_WAITING_FOR_HEALTHY;
1317 }
1318
1319private:
1320
7c673cae 1321 ShardedThreadPool osd_op_tp;
7c673cae 1322
7c673cae
FG
1323 void get_latest_osdmap();
1324
1325 // -- sessions --
1326private:
9f95a23c 1327 void dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap);
7c673cae 1328
9f95a23c 1329 ceph::mutex session_waiting_lock = ceph::make_mutex("OSD::session_waiting_lock");
f67539c2 1330 std::set<ceph::ref_t<Session>> session_waiting_for_map;
7c673cae
FG
1331
1332 /// Caller assumes refs for included Sessions
f67539c2 1333 void get_sessions_waiting_for_map(std::set<ceph::ref_t<Session>> *out) {
11fdf7f2 1334 std::lock_guard l(session_waiting_lock);
7c673cae
FG
1335 out->swap(session_waiting_for_map);
1336 }
9f95a23c 1337 void register_session_waiting_on_map(const ceph::ref_t<Session>& session) {
11fdf7f2
TL
1338 std::lock_guard l(session_waiting_lock);
1339 session_waiting_for_map.insert(session);
7c673cae 1340 }
9f95a23c 1341 void clear_session_waiting_on_map(const ceph::ref_t<Session>& session) {
11fdf7f2
TL
1342 std::lock_guard l(session_waiting_lock);
1343 session_waiting_for_map.erase(session);
7c673cae
FG
1344 }
1345 void dispatch_sessions_waiting_on_map() {
f67539c2 1346 std::set<ceph::ref_t<Session>> sessions_to_check;
7c673cae 1347 get_sessions_waiting_for_map(&sessions_to_check);
11fdf7f2 1348 for (auto i = sessions_to_check.begin();
7c673cae
FG
1349 i != sessions_to_check.end();
1350 sessions_to_check.erase(i++)) {
11fdf7f2 1351 std::lock_guard l{(*i)->session_dispatch_lock};
9f95a23c 1352 dispatch_session_waiting(*i, get_osdmap());
7c673cae
FG
1353 }
1354 }
9f95a23c 1355 void session_handle_reset(const ceph::ref_t<Session>& session) {
11fdf7f2 1356 std::lock_guard l(session->session_dispatch_lock);
7c673cae
FG
1357 clear_session_waiting_on_map(session);
1358
1359 session->clear_backoffs();
1360
1361 /* Messages have connection refs, we need to clear the
1362 * connection->session->message->connection
1363 * cycles which result.
1364 * Bug #12338
1365 */
1366 session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
1367 }
1368
1369private:
1370 /**
1371 * @defgroup monc helpers
1372 * @{
1373 * Right now we only have the one
1374 */
1375
1376 /**
1377 * Ask the Monitors for a sequence of OSDMaps.
1378 *
1379 * @param epoch The epoch to start with when replying
1380 * @param force_request True if this request forces a new subscription to
1381 * the monitors; false if an outstanding request that encompasses it is
1382 * sufficient.
1383 */
1384 void osdmap_subscribe(version_t epoch, bool force_request);
1385 /** @} monc helpers */
1386
9f95a23c 1387 ceph::mutex osdmap_subscribe_lock = ceph::make_mutex("OSD::osdmap_subscribe_lock");
181888fb
FG
1388 epoch_t latest_subscribed_epoch{0};
1389
7c673cae
FG
1390 // -- heartbeat --
1391 /// information about a heartbeat peer
1392 struct HeartbeatInfo {
1393 int peer; ///< peer
1394 ConnectionRef con_front; ///< peer connection (front)
1395 ConnectionRef con_back; ///< peer connection (back)
1396 utime_t first_tx; ///< time we sent our first ping request
1397 utime_t last_tx; ///< last time we sent a ping request
1398 utime_t last_rx_front; ///< last time we got a ping reply on the front side
1399 utime_t last_rx_back; ///< last time we got a ping reply on the back side
1400 epoch_t epoch; ///< most recent epoch we wanted this peer
11fdf7f2
TL
1401 /// number of connections we send and receive heartbeat pings/replies
1402 static constexpr int HEARTBEAT_MAX_CONN = 2;
1403 /// history of inflight pings, arranging by timestamp we sent
1404 /// send time -> deadline -> remaining replies
f67539c2 1405 std::map<utime_t, std::pair<utime_t, int>> ping_history;
11fdf7f2 1406
eafe8130
TL
1407 utime_t hb_interval_start;
1408 uint32_t hb_average_count = 0;
1409 uint32_t hb_index = 0;
1410
1411 uint32_t hb_total_back = 0;
1412 uint32_t hb_min_back = UINT_MAX;
1413 uint32_t hb_max_back = 0;
f67539c2
TL
1414 std::vector<uint32_t> hb_back_pingtime;
1415 std::vector<uint32_t> hb_back_min;
1416 std::vector<uint32_t> hb_back_max;
eafe8130
TL
1417
1418 uint32_t hb_total_front = 0;
1419 uint32_t hb_min_front = UINT_MAX;
1420 uint32_t hb_max_front = 0;
f67539c2
TL
1421 std::vector<uint32_t> hb_front_pingtime;
1422 std::vector<uint32_t> hb_front_min;
1423 std::vector<uint32_t> hb_front_max;
eafe8130 1424
494da23a
TL
1425 bool is_stale(utime_t stale) {
1426 if (ping_history.empty()) {
1427 return false;
1428 }
1429 utime_t oldest_deadline = ping_history.begin()->second.first;
1430 return oldest_deadline <= stale;
1431 }
1432
11fdf7f2
TL
1433 bool is_unhealthy(utime_t now) {
1434 if (ping_history.empty()) {
1435 /// we haven't sent a ping yet or we have got all replies,
1436 /// in either way we are safe and healthy for now
1437 return false;
1438 }
7c673cae 1439
11fdf7f2
TL
1440 utime_t oldest_deadline = ping_history.begin()->second.first;
1441 return now > oldest_deadline;
7c673cae
FG
1442 }
1443
11fdf7f2
TL
1444 bool is_healthy(utime_t now) {
1445 if (last_rx_front == utime_t() || last_rx_back == utime_t()) {
1446 // only declare to be healthy until we have received the first
1447 // replies from both front/back connections
1448 return false;
1449 }
1450 return !is_unhealthy(now);
1451 }
9f95a23c
TL
1452
1453 void clear_mark_down(Connection *except = nullptr) {
1454 if (con_back && con_back != except) {
1455 con_back->mark_down();
1456 con_back->clear_priv();
1457 con_back.reset(nullptr);
1458 }
1459 if (con_front && con_front != except) {
1460 con_front->mark_down();
1461 con_front->clear_priv();
1462 con_front.reset(nullptr);
1463 }
1464 }
7c673cae 1465 };
9f95a23c
TL
1466
1467 ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock");
f67539c2 1468 std::map<int, int> debug_heartbeat_drops_remaining;
9f95a23c 1469 ceph::condition_variable heartbeat_cond;
7c673cae 1470 bool heartbeat_stop;
f67539c2
TL
1471 std::atomic<bool> heartbeat_need_update;
1472 std::map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
7c673cae
FG
1473 utime_t last_mon_heartbeat;
1474 Messenger *hb_front_client_messenger;
1475 Messenger *hb_back_client_messenger;
1476 Messenger *hb_front_server_messenger;
1477 Messenger *hb_back_server_messenger;
1478 utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
1479 double daily_loadavg;
9f95a23c 1480 ceph::mono_time startup_time;
eafe8130
TL
1481
1482 // Track ping repsonse times using vector as a circular buffer
1483 // MUST BE A POWER OF 2
1484 const uint32_t hb_vector_size = 16;
1485
7c673cae
FG
1486 void _add_heartbeat_peer(int p);
1487 void _remove_heartbeat_peer(int p);
1488 bool heartbeat_reset(Connection *con);
1489 void maybe_update_heartbeat_peers();
494da23a 1490 void reset_heartbeat_peers(bool all);
7c673cae
FG
1491 bool heartbeat_peers_need_update() {
1492 return heartbeat_need_update.load();
1493 }
1494 void heartbeat_set_peers_need_update() {
1495 heartbeat_need_update.store(true);
1496 }
1497 void heartbeat_clear_peers_need_update() {
1498 heartbeat_need_update.store(false);
1499 }
1500 void heartbeat();
1501 void heartbeat_check();
1502 void heartbeat_entry();
1503 void need_heartbeat_peer_update();
1504
1505 void heartbeat_kick() {
11fdf7f2 1506 std::lock_guard l(heartbeat_lock);
9f95a23c 1507 heartbeat_cond.notify_all();
7c673cae
FG
1508 }
1509
1510 struct T_Heartbeat : public Thread {
1511 OSD *osd;
1512 explicit T_Heartbeat(OSD *o) : osd(o) {}
1513 void *entry() override {
1514 osd->heartbeat_entry();
1515 return 0;
1516 }
1517 } heartbeat_thread;
1518
1519public:
1520 bool heartbeat_dispatch(Message *m);
1521
1522 struct HeartbeatDispatcher : public Dispatcher {
1523 OSD *osd;
1524 explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
1525
1526 bool ms_can_fast_dispatch_any() const override { return true; }
1527 bool ms_can_fast_dispatch(const Message *m) const override {
1528 switch (m->get_type()) {
11fdf7f2
TL
1529 case CEPH_MSG_PING:
1530 case MSG_OSD_PING:
1531 return true;
1532 default:
1533 return false;
1534 }
7c673cae
FG
1535 }
1536 void ms_fast_dispatch(Message *m) override {
1537 osd->heartbeat_dispatch(m);
1538 }
1539 bool ms_dispatch(Message *m) override {
1540 return osd->heartbeat_dispatch(m);
1541 }
1542 bool ms_handle_reset(Connection *con) override {
1543 return osd->heartbeat_reset(con);
1544 }
1545 void ms_handle_remote_reset(Connection *con) override {}
1546 bool ms_handle_refused(Connection *con) override {
1547 return osd->ms_handle_refused(con);
1548 }
11fdf7f2 1549 int ms_handle_authentication(Connection *con) override {
7c673cae
FG
1550 return true;
1551 }
1552 } heartbeat_dispatcher;
1553
1554private:
1555 // -- waiters --
f67539c2
TL
1556 std::list<OpRequestRef> finished;
1557
1558 void take_waiters(std::list<OpRequestRef>& ls) {
9f95a23c 1559 ceph_assert(ceph_mutex_is_locked(osd_lock));
7c673cae
FG
1560 finished.splice(finished.end(), ls);
1561 }
1562 void do_waiters();
f67539c2 1563
7c673cae
FG
1564 // -- op tracking --
1565 OpTracker op_tracker;
f67539c2 1566 void test_ops(std::string command, std::string args, std::ostream& ss);
7c673cae
FG
1567 friend class TestOpsSocketHook;
1568 TestOpsSocketHook *test_ops_hook;
11fdf7f2 1569 friend struct C_FinishSplits;
7c673cae
FG
1570 friend struct C_OpenPGs;
1571
11fdf7f2 1572protected:
7c673cae
FG
1573
1574 /*
1575 * The ordered op delivery chain is:
1576 *
9f95a23c
TL
1577 * fast dispatch -> scheduler back
1578 * scheduler front <-> to_process back
7c673cae
FG
1579 * to_process front -> RunVis(item)
1580 * <- queue_front()
1581 *
9f95a23c
TL
1582 * The scheduler is per-shard, and to_process is per pg_slot. Items can be
1583 * pushed back up into to_process and/or scheduler while order is preserved.
7c673cae
FG
1584 *
1585 * Multiple worker threads can operate on each shard.
1586 *
11fdf7f2 1587 * Under normal circumstances, num_running == to_process.size(). There are
7c673cae
FG
1588 * two times when that is not true: (1) when waiting_for_pg == true and
1589 * to_process is accumulating requests that are waiting for the pg to be
1590 * instantiated; in that case they will all get requeued together by
1591 * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
1592 * and already requeued the items.
1593 */
9f95a23c
TL
1594 friend class ceph::osd::scheduler::PGOpItem;
1595 friend class ceph::osd::scheduler::PGPeeringItem;
1596 friend class ceph::osd::scheduler::PGRecovery;
f67539c2 1597 friend class ceph::osd::scheduler::PGRecoveryMsg;
9f95a23c 1598 friend class ceph::osd::scheduler::PGDelete;
224ce89b 1599
7c673cae 1600 class ShardedOpWQ
9f95a23c 1601 : public ShardedThreadPool::ShardedWQ<OpSchedulerItem>
7c673cae 1602 {
7c673cae 1603 OSD *osd;
7c673cae
FG
1604
1605 public:
11fdf7f2 1606 ShardedOpWQ(OSD *o,
f67539c2
TL
1607 ceph::timespan ti,
1608 ceph::timespan si,
7c673cae 1609 ShardedThreadPool* tp)
9f95a23c 1610 : ShardedThreadPool::ShardedWQ<OpSchedulerItem>(ti, si, tp),
11fdf7f2 1611 osd(o) {
7c673cae
FG
1612 }
1613
11fdf7f2
TL
1614 void _add_slot_waiter(
1615 spg_t token,
1616 OSDShardPGSlot *slot,
9f95a23c 1617 OpSchedulerItem&& qi);
7c673cae
FG
1618
1619 /// try to do some work
f67539c2 1620 void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb) override;
7c673cae
FG
1621
1622 /// enqueue a new item
9f95a23c 1623 void _enqueue(OpSchedulerItem&& item) override;
7c673cae
FG
1624
1625 /// requeue an old item (at the front of the line)
9f95a23c 1626 void _enqueue_front(OpSchedulerItem&& item) override;
f67539c2 1627
7c673cae 1628 void return_waiting_threads() override {
11fdf7f2
TL
1629 for(uint32_t i = 0; i < osd->num_shards; i++) {
1630 OSDShard* sdata = osd->shards[i];
1631 assert (NULL != sdata);
1632 std::scoped_lock l{sdata->sdata_wait_lock};
1633 sdata->stop_waiting = true;
1634 sdata->sdata_cond.notify_all();
7c673cae
FG
1635 }
1636 }
1637
11fdf7f2
TL
1638 void stop_return_waiting_threads() override {
1639 for(uint32_t i = 0; i < osd->num_shards; i++) {
1640 OSDShard* sdata = osd->shards[i];
7c673cae 1641 assert (NULL != sdata);
11fdf7f2
TL
1642 std::scoped_lock l{sdata->sdata_wait_lock};
1643 sdata->stop_waiting = false;
1644 }
1645 }
1646
f67539c2 1647 void dump(ceph::Formatter *f) {
11fdf7f2
TL
1648 for(uint32_t i = 0; i < osd->num_shards; i++) {
1649 auto &&sdata = osd->shards[i];
1650
1651 char queue_name[32] = {0};
1652 snprintf(queue_name, sizeof(queue_name), "%s%" PRIu32, "OSD:ShardedOpWQ:", i);
1653 ceph_assert(NULL != sdata);
1654
1655 std::scoped_lock l{sdata->shard_lock};
1656 f->open_object_section(queue_name);
9f95a23c 1657 sdata->scheduler->dump(*f);
7c673cae 1658 f->close_section();
7c673cae
FG
1659 }
1660 }
1661
11fdf7f2
TL
1662 bool is_shard_empty(uint32_t thread_index) override {
1663 uint32_t shard_index = thread_index % osd->num_shards;
1664 auto &&sdata = osd->shards[shard_index];
1665 ceph_assert(sdata);
1666 std::lock_guard l(sdata->shard_lock);
1667 if (thread_index < osd->num_shards) {
9f95a23c 1668 return sdata->scheduler->empty() && sdata->context_queue.empty();
11fdf7f2 1669 } else {
9f95a23c 1670 return sdata->scheduler->empty();
7c673cae 1671 }
11fdf7f2 1672 }
7c673cae 1673
f67539c2 1674 void handle_oncommits(std::list<Context*>& oncommits) {
11fdf7f2
TL
1675 for (auto p : oncommits) {
1676 p->complete(0);
1677 }
7c673cae
FG
1678 }
1679 } op_shardedwq;
1680
1681
11fdf7f2 1682 void enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch);
7c673cae
FG
1683 void dequeue_op(
1684 PGRef pg, OpRequestRef op,
1685 ThreadPool::TPHandle &handle);
1686
11fdf7f2
TL
1687 void enqueue_peering_evt(
1688 spg_t pgid,
1689 PGPeeringEventRef ref);
11fdf7f2
TL
1690 void dequeue_peering_evt(
1691 OSDShard *sdata,
1692 PG *pg,
1693 PGPeeringEventRef ref,
1694 ThreadPool::TPHandle& handle);
1695
1696 void dequeue_delete(
1697 OSDShard *sdata,
1698 PG *pg,
1699 epoch_t epoch,
1700 ThreadPool::TPHandle& handle);
7c673cae
FG
1701
1702 friend class PG;
f67539c2 1703 friend struct OSDShard;
7c673cae 1704 friend class PrimaryLogPG;
f67539c2 1705 friend class PgScrubber;
7c673cae
FG
1706
1707
1708 protected:
1709
1710 // -- osd map --
9f95a23c
TL
1711 // TODO: switch to std::atomic<OSDMapRef> when C++20 will be available.
1712 OSDMapRef _osdmap;
1713 void set_osdmap(OSDMapRef osdmap) {
1714 std::atomic_store(&_osdmap, osdmap);
1715 }
1716 OSDMapRef get_osdmap() const {
1717 return std::atomic_load(&_osdmap);
7c673cae 1718 }
224ce89b 1719 epoch_t get_osdmap_epoch() const {
9f95a23c
TL
1720 // XXX: performance?
1721 auto osdmap = get_osdmap();
7c673cae
FG
1722 return osdmap ? osdmap->get_epoch() : 0;
1723 }
1724
11fdf7f2
TL
1725 pool_pg_num_history_t pg_num_history;
1726
9f95a23c 1727 ceph::shared_mutex map_lock = ceph::make_shared_mutex("OSD::map_lock");
f67539c2
TL
1728 std::list<OpRequestRef> waiting_for_osdmap;
1729 std::deque<utime_t> osd_markdown_log;
7c673cae
FG
1730
1731 friend struct send_map_on_destruct;
1732
1733 void wait_for_new_map(OpRequestRef op);
1734 void handle_osd_map(class MOSDMap *m);
1735 void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
1736 void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
1737 void note_down_osd(int osd);
1738 void note_up_osd(int osd);
f67539c2 1739 friend struct C_OnMapCommit;
7c673cae
FG
1740
1741 bool advance_pg(
11fdf7f2
TL
1742 epoch_t advance_to,
1743 PG *pg,
7c673cae 1744 ThreadPool::TPHandle &handle,
9f95a23c 1745 PeeringCtx &rctx);
7c673cae
FG
1746 void consume_map();
1747 void activate_map();
1748
1749 // osd map cache (past osd maps)
1750 OSDMapRef get_map(epoch_t e) {
1751 return service.get_map(e);
1752 }
1753 OSDMapRef add_map(OSDMap *o) {
1754 return service.add_map(o);
1755 }
f67539c2 1756 bool get_map_bl(epoch_t e, ceph::buffer::list& bl) {
7c673cae
FG
1757 return service.get_map_bl(e, bl);
1758 }
11fdf7f2
TL
1759
1760public:
1761 // -- shards --
f67539c2 1762 std::vector<OSDShard*> shards;
11fdf7f2
TL
1763 uint32_t num_shards = 0;
1764
1765 void inc_num_pgs() {
1766 ++num_pgs;
1767 }
1768 void dec_num_pgs() {
1769 --num_pgs;
1770 }
1771 int get_num_pgs() const {
1772 return num_pgs;
7c673cae
FG
1773 }
1774
1775protected:
9f95a23c 1776 ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
11fdf7f2 1777 /// merge epoch -> target pgid -> source pgid -> pg
f67539c2 1778 std::map<epoch_t,std::map<spg_t,std::map<spg_t,PGRef>>> merge_waiters;
11fdf7f2
TL
1779
1780 bool add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef source,
1781 unsigned need);
1782
7c673cae 1783 // -- placement groups --
11fdf7f2 1784 std::atomic<size_t> num_pgs = {0};
7c673cae 1785
3efd9988 1786 std::mutex pending_creates_lock;
9f95a23c 1787 using create_from_osd_t = std::pair<spg_t, bool /* is primary*/>;
b32b8144 1788 std::set<create_from_osd_t> pending_creates_from_osd;
3efd9988
FG
1789 unsigned pending_creates_from_mon = 0;
1790
7c673cae
FG
1791 PGRecoveryStats pg_recovery_stats;
1792
11fdf7f2
TL
1793 PGRef _lookup_pg(spg_t pgid);
1794 PGRef _lookup_lock_pg(spg_t pgid);
1795 void register_pg(PGRef pg);
1796 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
7c673cae 1797
f67539c2
TL
1798 void _get_pgs(std::vector<PGRef> *v, bool clear_too=false);
1799 void _get_pgids(std::vector<spg_t> *v);
31f18b77
FG
1800
1801public:
11fdf7f2 1802 PGRef lookup_lock_pg(spg_t pgid);
31f18b77 1803
11fdf7f2 1804 std::set<int64_t> get_mapped_pools();
35e4c445 1805
31f18b77 1806protected:
7c673cae 1807 PG* _make_pg(OSDMapRef createmap, spg_t pgid);
7c673cae 1808
11fdf7f2
TL
1809 bool maybe_wait_for_max_pg(const OSDMapRef& osdmap,
1810 spg_t pgid, bool is_mon_create);
3efd9988
FG
1811 void resume_creating_pg();
1812
7c673cae 1813 void load_pgs();
7c673cae
FG
1814
1815 /// build initial pg history and intervals on create
1816 void build_initial_pg_history(
1817 spg_t pgid,
1818 epoch_t created,
1819 utime_t created_stamp,
1820 pg_history_t *h,
1821 PastIntervals *pi);
1822
7c673cae
FG
1823 epoch_t last_pg_create_epoch;
1824
1825 void handle_pg_create(OpRequestRef op);
1826
1827 void split_pgs(
1828 PG *parent,
f67539c2 1829 const std::set<spg_t> &childpgids, std::set<PGRef> *out_pgs,
7c673cae
FG
1830 OSDMapRef curmap,
1831 OSDMapRef nextmap,
9f95a23c 1832 PeeringCtx &rctx);
f67539c2 1833 void _finish_splits(std::set<PGRef>& pgs);
7c673cae
FG
1834
1835 // == monitor interaction ==
9f95a23c 1836 ceph::mutex mon_report_lock = ceph::make_mutex("OSD::mon_report_lock");
7c673cae 1837 utime_t last_mon_report;
11fdf7f2 1838 Finisher boot_finisher;
7c673cae
FG
1839
1840 // -- boot --
1841 void start_boot();
1842 void _got_mon_epochs(epoch_t oldest, epoch_t newest);
1843 void _preboot(epoch_t oldest, epoch_t newest);
1844 void _send_boot();
f67539c2 1845 void _collect_metadata(std::map<std::string,std::string> *pmeta);
9f95a23c
TL
1846 void _get_purged_snaps();
1847 void handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *r);
7c673cae
FG
1848
1849 void start_waiting_for_healthy();
1850 bool _is_healthy();
1851
1852 void send_full_update();
f67539c2
TL
1853
1854 friend struct CB_OSD_GetVersion;
7c673cae
FG
1855
1856 // -- alive --
1857 epoch_t up_thru_wanted;
1858
1859 void queue_want_up_thru(epoch_t want);
1860 void send_alive();
1861
1862 // -- full map requests --
1863 epoch_t requested_full_first, requested_full_last;
1864
1865 void request_full_map(epoch_t first, epoch_t last);
1866 void rerequest_full_maps() {
1867 epoch_t first = requested_full_first;
1868 epoch_t last = requested_full_last;
1869 requested_full_first = 0;
1870 requested_full_last = 0;
1871 request_full_map(first, last);
1872 }
1873 void got_full_map(epoch_t e);
1874
1875 // -- failures --
f67539c2
TL
1876 std::map<int,utime_t> failure_queue;
1877 std::map<int,std::pair<utime_t,entity_addrvec_t> > failure_pending;
7c673cae
FG
1878
1879 void requeue_failures();
1880 void send_failures();
11fdf7f2
TL
1881 void send_still_alive(epoch_t epoch, int osd, const entity_addrvec_t &addrs);
1882 void cancel_pending_failures();
7c673cae
FG
1883
1884 ceph::coarse_mono_clock::time_point last_sent_beacon;
9f95a23c 1885 ceph::mutex min_last_epoch_clean_lock = ceph::make_mutex("OSD::min_last_epoch_clean_lock");
7c673cae
FG
1886 epoch_t min_last_epoch_clean = 0;
1887 // which pgs were scanned for min_lec
1888 std::vector<pg_t> min_last_epoch_clean_pgs;
1889 void send_beacon(const ceph::coarse_mono_clock::time_point& now);
1890
7c673cae
FG
1891 ceph_tid_t get_tid() {
1892 return service.get_tid();
1893 }
1894
9f95a23c
TL
1895 double scrub_sleep_time(bool must_scrub);
1896
7c673cae 1897 // -- generic pg peering --
9f95a23c
TL
1898 PeeringCtx create_context();
1899 void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
7c673cae 1900 ThreadPool::TPHandle *handle = NULL);
7c673cae
FG
1901
1902 bool require_mon_peer(const Message *m);
1903 bool require_mon_or_mgr_peer(const Message *m);
1904 bool require_osd_peer(const Message *m);
1905 /***
1906 * Verifies that we were alive in the given epoch, and that
1907 * still are.
1908 */
1909 bool require_self_aliveness(const Message *m, epoch_t alive_since);
1910 /**
1911 * Verifies that the OSD who sent the given op has the same
f67539c2 1912 * address as in the given std::map.
7c673cae
FG
1913 * @pre op was sent by an OSD using the cluster messenger
1914 */
9f95a23c 1915 bool require_same_peer_instance(const Message *m, const OSDMapRef& map,
7c673cae
FG
1916 bool is_fast_dispatch);
1917
1918 bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
1919 bool is_fast_dispatch);
1920
11fdf7f2
TL
1921 void handle_fast_pg_create(MOSDPGCreate2 *m);
1922 void handle_fast_pg_query(MOSDPGQuery *m);
1923 void handle_pg_query_nopg(const MQuery& q);
1924 void handle_fast_pg_notify(MOSDPGNotify *m);
1925 void handle_pg_notify_nopg(const MNotifyRec& q);
1926 void handle_fast_pg_info(MOSDPGInfo *m);
1927 void handle_fast_pg_remove(MOSDPGRemove *m);
7c673cae 1928
11fdf7f2
TL
1929public:
1930 // used by OSDShard
1931 PGRef handle_pg_create_info(const OSDMapRef& osdmap, const PGCreateInfo *info);
1932protected:
c07f9fc5 1933
11fdf7f2 1934 void handle_fast_force_recovery(MOSDForceRecovery *m);
7c673cae
FG
1935
1936 // -- commands --
7c673cae 1937 void handle_command(class MCommand *m);
11fdf7f2 1938
7c673cae
FG
1939
1940 // -- pg recovery --
1941 void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
1942 ThreadPool::TPHandle &handle);
1943
1944
1945 // -- scrubbing --
1946 void sched_scrub();
494da23a 1947 void resched_all_scrubs();
7c673cae
FG
1948 bool scrub_random_backoff();
1949 bool scrub_load_below_threshold();
1950 bool scrub_time_permit(utime_t now);
1951
b32b8144
FG
1952 // -- status reporting --
1953 MPGStats *collect_pg_stats();
11fdf7f2
TL
1954 std::vector<DaemonHealthMetric> get_health_metrics();
1955
b32b8144 1956
224ce89b 1957private:
7c673cae
FG
1958 bool ms_can_fast_dispatch_any() const override { return true; }
1959 bool ms_can_fast_dispatch(const Message *m) const override {
1960 switch (m->get_type()) {
11fdf7f2 1961 case CEPH_MSG_PING:
7c673cae
FG
1962 case CEPH_MSG_OSD_OP:
1963 case CEPH_MSG_OSD_BACKOFF:
11fdf7f2
TL
1964 case MSG_OSD_SCRUB2:
1965 case MSG_OSD_FORCE_RECOVERY:
1966 case MSG_MON_COMMAND:
1967 case MSG_OSD_PG_CREATE2:
1968 case MSG_OSD_PG_QUERY:
9f95a23c 1969 case MSG_OSD_PG_QUERY2:
11fdf7f2 1970 case MSG_OSD_PG_INFO:
9f95a23c 1971 case MSG_OSD_PG_INFO2:
11fdf7f2 1972 case MSG_OSD_PG_NOTIFY:
9f95a23c 1973 case MSG_OSD_PG_NOTIFY2:
11fdf7f2
TL
1974 case MSG_OSD_PG_LOG:
1975 case MSG_OSD_PG_TRIM:
1976 case MSG_OSD_PG_REMOVE:
1977 case MSG_OSD_BACKFILL_RESERVE:
1978 case MSG_OSD_RECOVERY_RESERVE:
7c673cae 1979 case MSG_OSD_REPOP:
7c673cae
FG
1980 case MSG_OSD_REPOPREPLY:
1981 case MSG_OSD_PG_PUSH:
1982 case MSG_OSD_PG_PULL:
1983 case MSG_OSD_PG_PUSH_REPLY:
1984 case MSG_OSD_PG_SCAN:
1985 case MSG_OSD_PG_BACKFILL:
1986 case MSG_OSD_PG_BACKFILL_REMOVE:
1987 case MSG_OSD_EC_WRITE:
1988 case MSG_OSD_EC_WRITE_REPLY:
1989 case MSG_OSD_EC_READ:
1990 case MSG_OSD_EC_READ_REPLY:
1991 case MSG_OSD_SCRUB_RESERVE:
1992 case MSG_OSD_REP_SCRUB:
1993 case MSG_OSD_REP_SCRUBMAP:
1994 case MSG_OSD_PG_UPDATE_LOG_MISSING:
1995 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
c07f9fc5
FG
1996 case MSG_OSD_PG_RECOVERY_DELETE:
1997 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
9f95a23c
TL
1998 case MSG_OSD_PG_LEASE:
1999 case MSG_OSD_PG_LEASE_ACK:
7c673cae
FG
2000 return true;
2001 default:
2002 return false;
2003 }
2004 }
2005 void ms_fast_dispatch(Message *m) override;
7c673cae 2006 bool ms_dispatch(Message *m) override;
7c673cae
FG
2007 void ms_handle_connect(Connection *con) override;
2008 void ms_handle_fast_connect(Connection *con) override;
2009 void ms_handle_fast_accept(Connection *con) override;
11fdf7f2 2010 int ms_handle_authentication(Connection *con) override;
7c673cae
FG
2011 bool ms_handle_reset(Connection *con) override;
2012 void ms_handle_remote_reset(Connection *con) override {}
2013 bool ms_handle_refused(Connection *con) override;
2014
7c673cae
FG
2015 public:
2016 /* internal and external can point to the same messenger, they will still
2017 * be cleaned up properly*/
2018 OSD(CephContext *cct_,
2019 ObjectStore *store_,
2020 int id,
2021 Messenger *internal,
2022 Messenger *external,
2023 Messenger *hb_front_client,
2024 Messenger *hb_back_client,
2025 Messenger *hb_front_server,
2026 Messenger *hb_back_server,
2027 Messenger *osdc_messenger,
f67539c2
TL
2028 MonClient *mc, const std::string &dev, const std::string &jdev,
2029 ceph::async::io_context_pool& poolctx);
7c673cae
FG
2030 ~OSD() override;
2031
2032 // static bits
e306af50 2033 static int mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, std::string osdspec_affinity);
11fdf7f2 2034
f67539c2
TL
2035 /* remove any non-user xattrs from a std::map of them */
2036 void filter_xattrs(std::map<std::string, ceph::buffer::ptr>& attrs) {
2037 for (std::map<std::string, ceph::buffer::ptr>::iterator iter = attrs.begin();
7c673cae
FG
2038 iter != attrs.end();
2039 ) {
2040 if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
2041 attrs.erase(iter++);
2042 else ++iter;
2043 }
2044 }
2045
2046private:
f67539c2 2047 int mon_cmd_maybe_osd_create(std::string &cmd);
7c673cae
FG
2048 int update_crush_device_class();
2049 int update_crush_location();
2050
3efd9988
FG
2051 static int write_meta(CephContext *cct,
2052 ObjectStore *store,
e306af50 2053 uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami, std::string& osdspec_affinity);
7c673cae 2054
f67539c2
TL
2055 void handle_scrub(class MOSDScrub *m);
2056 void handle_fast_scrub(class MOSDScrub2 *m);
7c673cae
FG
2057 void handle_osd_ping(class MOSDPing *m);
2058
9f95a23c 2059 size_t get_num_cache_shards();
31f18b77
FG
2060 int get_num_op_shards();
2061 int get_num_op_threads();
2062
c07f9fc5 2063 float get_osd_recovery_sleep();
11fdf7f2 2064 float get_osd_delete_sleep();
494da23a 2065 float get_osd_snap_trim_sleep();
11fdf7f2 2066
9f95a23c
TL
2067 int get_recovery_max_active();
2068
2069 void scrub_purged_snaps();
f67539c2 2070 void probe_smart(const std::string& devid, std::ostream& ss);
c07f9fc5 2071
7c673cae 2072public:
11fdf7f2 2073 static int peek_meta(ObjectStore *store,
f67539c2 2074 std::string *magic,
11fdf7f2
TL
2075 uuid_d *cluster_fsid,
2076 uuid_d *osd_fsid,
2077 int *whoami,
9f95a23c 2078 ceph_release_t *min_osd_release);
f67539c2 2079
7c673cae
FG
2080
2081 // startup/shutdown
2082 int pre_init();
2083 int init();
2084 void final_init();
2085
2086 int enable_disable_fuse(bool stop);
11fdf7f2 2087 int set_numa_affinity();
7c673cae
FG
2088
2089 void suicide(int exitcode);
2090 int shutdown();
2091
2092 void handle_signal(int signum);
2093
2094 /// check if we can throw out op from a disconnected client
2095 static bool op_is_discardable(const MOSDOp *m);
2096
2097public:
2098 OSDService service;
2099 friend class OSDService;
11fdf7f2
TL
2100
2101private:
9f95a23c
TL
2102 void set_perf_queries(const ConfigPayload &config_payload);
2103 MetricPayload get_perf_reports();
11fdf7f2 2104
9f95a23c 2105 ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock");
11fdf7f2
TL
2106 std::list<OSDPerfMetricQuery> m_perf_queries;
2107 std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> m_perf_limits;
7c673cae
FG
2108};
2109
224ce89b 2110
7c673cae
FG
2111//compatibility of the executable
2112extern const CompatSet::Feature ceph_osd_feature_compat[];
2113extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
2114extern const CompatSet::Feature ceph_osd_feature_incompat[];
2115
224ce89b 2116#endif // CEPH_OSD_H