]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/OSD.h
import 15.2.4
[ceph.git] / ceph / src / osd / OSD.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_OSD_H
16#define CEPH_OSD_H
17
18#include "PG.h"
19
20#include "msg/Dispatcher.h"
21
7c673cae
FG
22#include "common/Timer.h"
23#include "common/WorkQueue.h"
24#include "common/AsyncReserver.h"
25#include "common/ceph_context.h"
11fdf7f2 26#include "common/config_cacher.h"
7c673cae 27#include "common/zipkin_trace.h"
9f95a23c 28#include "common/ceph_timer.h"
7c673cae
FG
29
30#include "mgr/MgrClient.h"
31
32#include "os/ObjectStore.h"
33#include "OSDCap.h"
34
35#include "auth/KeyRing.h"
11fdf7f2 36
7c673cae
FG
37#include "osd/ClassHandler.h"
38
39#include "include/CompatSet.h"
9f95a23c 40#include "include/common_fwd.h"
7c673cae
FG
41
42#include "OpRequest.h"
43#include "Session.h"
44
9f95a23c 45#include "osd/scheduler/OpScheduler.h"
224ce89b 46
7c673cae
FG
47#include <atomic>
48#include <map>
49#include <memory>
11fdf7f2 50#include <string>
7c673cae
FG
51
52#include "include/unordered_map.h"
53
54#include "common/shared_cache.hpp"
55#include "common/simple_cache.hpp"
56#include "common/sharedptr_registry.hpp"
57#include "common/WeightedPriorityQueue.h"
58#include "common/PrioritizedQueue.h"
59#include "messages/MOSDOp.h"
7c673cae 60#include "common/EventTrace.h"
9f95a23c 61#include "osd/osd_perf_counters.h"
7c673cae
FG
62
63#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
64
11fdf7f2
TL
65/*
66
67 lock ordering for pg map
68
69 PG::lock
70 ShardData::lock
71 OSD::pg_map_lock
72
73 */
7c673cae 74
7c673cae
FG
75class Messenger;
76class Message;
77class MonClient;
7c673cae
FG
78class ObjectStore;
79class FuseStore;
80class OSDMap;
81class MLog;
82class Objecter;
11fdf7f2 83class KeyStore;
7c673cae
FG
84
85class Watch;
86class PrimaryLogPG;
87
7c673cae 88class TestOpsSocketHook;
11fdf7f2 89struct C_FinishSplits;
7c673cae
FG
90struct C_OpenPGs;
91class LogChannel;
7c673cae
FG
92class MOSDOp;
93
11fdf7f2
TL
94class MOSDPGCreate2;
95class MOSDPGQuery;
96class MOSDPGNotify;
97class MOSDPGInfo;
98class MOSDPGRemove;
99class MOSDForceRecovery;
9f95a23c 100class MMonGetPurgedSnapsReply;
7c673cae
FG
101
102class OSD;
103
7c673cae 104class OSDService {
9f95a23c 105 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
7c673cae
FG
106public:
107 OSD *osd;
108 CephContext *cct;
11fdf7f2 109 ObjectStore::CollectionHandle meta_ch;
7c673cae
FG
110 const int whoami;
111 ObjectStore *&store;
112 LogClient &log_client;
113 LogChannelRef clog;
114 PGRecoveryStats &pg_recovery_stats;
115private:
116 Messenger *&cluster_messenger;
117 Messenger *&client_messenger;
118public:
119 PerfCounters *&logger;
120 PerfCounters *&recoverystate_perf;
121 MonClient *&monc;
7c673cae 122
11fdf7f2
TL
123 md_config_cacher_t<Option::size_t> osd_max_object_size;
124 md_config_cacher_t<bool> osd_skip_data_digest;
125
9f95a23c
TL
126 void enqueue_back(OpSchedulerItem&& qi);
127 void enqueue_front(OpSchedulerItem&& qi);
7c673cae
FG
128
129 void maybe_inject_dispatch_delay() {
11fdf7f2 130 if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) {
7c673cae 131 if (rand() % 10000 <
11fdf7f2 132 g_conf()->osd_debug_inject_dispatch_delay_probability * 10000) {
7c673cae 133 utime_t t;
11fdf7f2 134 t.set_from_double(g_conf()->osd_debug_inject_dispatch_delay_duration);
7c673cae
FG
135 t.sleep();
136 }
137 }
138 }
139
9f95a23c
TL
140 ceph::signedspan get_mnow();
141
7c673cae
FG
142private:
143 // -- superblock --
11fdf7f2 144 ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
7c673cae
FG
145 OSDSuperblock superblock;
146
147public:
148 OSDSuperblock get_superblock() {
11fdf7f2 149 std::lock_guard l(publish_lock);
7c673cae
FG
150 return superblock;
151 }
152 void publish_superblock(const OSDSuperblock &block) {
11fdf7f2 153 std::lock_guard l(publish_lock);
7c673cae
FG
154 superblock = block;
155 }
156
157 int get_nodeid() const { return whoami; }
158
159 std::atomic<epoch_t> max_oldest_map;
160private:
161 OSDMapRef osdmap;
162
163public:
164 OSDMapRef get_osdmap() {
11fdf7f2 165 std::lock_guard l(publish_lock);
7c673cae
FG
166 return osdmap;
167 }
168 epoch_t get_osdmap_epoch() {
11fdf7f2 169 std::lock_guard l(publish_lock);
7c673cae
FG
170 return osdmap ? osdmap->get_epoch() : 0;
171 }
172 void publish_map(OSDMapRef map) {
11fdf7f2 173 std::lock_guard l(publish_lock);
7c673cae
FG
174 osdmap = map;
175 }
176
177 /*
178 * osdmap - current published map
179 * next_osdmap - pre_published map that is about to be published.
180 *
181 * We use the next_osdmap to send messages and initiate connections,
182 * but only if the target is the same instance as the one in the map
183 * epoch the current user is working from (i.e., the result is
184 * equivalent to what is in next_osdmap).
185 *
186 * This allows the helpers to start ignoring osds that are about to
187 * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
188 * down, without worrying about reopening connections from threads
189 * working from old maps.
190 */
191private:
192 OSDMapRef next_osdmap;
11fdf7f2 193 ceph::condition_variable pre_publish_cond;
9f95a23c 194 int pre_publish_waiter = 0;
7c673cae
FG
195
196public:
197 void pre_publish_map(OSDMapRef map) {
11fdf7f2 198 std::lock_guard l(pre_publish_lock);
7c673cae
FG
199 next_osdmap = std::move(map);
200 }
201
202 void activate_map();
203 /// map epochs reserved below
204 map<epoch_t, unsigned> map_reservations;
205
206 /// gets ref to next_osdmap and registers the epoch as reserved
207 OSDMapRef get_nextmap_reserved() {
11fdf7f2 208 std::lock_guard l(pre_publish_lock);
7c673cae
FG
209 if (!next_osdmap)
210 return OSDMapRef();
211 epoch_t e = next_osdmap->get_epoch();
212 map<epoch_t, unsigned>::iterator i =
213 map_reservations.insert(make_pair(e, 0)).first;
214 i->second++;
215 return next_osdmap;
216 }
217 /// releases reservation on map
218 void release_map(OSDMapRef osdmap) {
11fdf7f2 219 std::lock_guard l(pre_publish_lock);
7c673cae
FG
220 map<epoch_t, unsigned>::iterator i =
221 map_reservations.find(osdmap->get_epoch());
11fdf7f2
TL
222 ceph_assert(i != map_reservations.end());
223 ceph_assert(i->second > 0);
7c673cae
FG
224 if (--(i->second) == 0) {
225 map_reservations.erase(i);
226 }
9f95a23c
TL
227 if (pre_publish_waiter) {
228 pre_publish_cond.notify_all();
229 }
7c673cae
FG
230 }
231 /// blocks until there are no reserved maps prior to next_osdmap
232 void await_reserved_maps() {
11fdf7f2
TL
233 std::unique_lock l{pre_publish_lock};
234 ceph_assert(next_osdmap);
9f95a23c 235 pre_publish_waiter++;
11fdf7f2
TL
236 pre_publish_cond.wait(l, [this] {
237 auto i = map_reservations.cbegin();
238 return (i == map_reservations.cend() ||
239 i->first >= next_osdmap->get_epoch());
240 });
9f95a23c 241 pre_publish_waiter--;
11fdf7f2
TL
242 }
243 OSDMapRef get_next_osdmap() {
244 std::lock_guard l(pre_publish_lock);
245 if (!next_osdmap)
246 return OSDMapRef();
247 return next_osdmap;
7c673cae
FG
248 }
249
9f95a23c
TL
250 void maybe_share_map(Connection *con,
251 const OSDMapRef& osdmap,
252 epoch_t peer_epoch_lb=0);
7c673cae
FG
253
254 void send_map(class MOSDMap *m, Connection *con);
9f95a23c
TL
255 void send_incremental_map(epoch_t since, Connection *con,
256 const OSDMapRef& osdmap);
7c673cae
FG
257 MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
258 OSDSuperblock& superblock);
7c673cae
FG
259
260 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
261 pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
262 void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
9f95a23c 263 void send_message_osd_cluster(std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch);
7c673cae
FG
264 void send_message_osd_cluster(Message *m, Connection *con) {
265 con->send_message(m);
266 }
267 void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
268 con->send_message(m);
269 }
7c673cae
FG
270 void send_message_osd_client(Message *m, const ConnectionRef& con) {
271 con->send_message(m);
272 }
11fdf7f2 273 entity_name_t get_cluster_msgr_name() const;
7c673cae
FG
274
275private:
276 // -- scrub scheduling --
9f95a23c 277 ceph::mutex sched_scrub_lock = ceph::make_mutex("OSDService::sched_scrub_lock");
eafe8130
TL
278 int scrubs_local;
279 int scrubs_remote;
7c673cae
FG
280
281public:
282 struct ScrubJob {
283 CephContext* cct;
284 /// pg to be scrubbed
285 spg_t pgid;
286 /// a time scheduled for scrub. but the scrub could be delayed if system
287 /// load is too high or it fails to fall in the scrub hours
288 utime_t sched_time;
289 /// the hard upper bound of scrub time
290 utime_t deadline;
291 ScrubJob() : cct(nullptr) {}
292 explicit ScrubJob(CephContext* cct, const spg_t& pg,
293 const utime_t& timestamp,
294 double pool_scrub_min_interval = 0,
295 double pool_scrub_max_interval = 0, bool must = true);
296 /// order the jobs by sched_time
297 bool operator<(const ScrubJob& rhs) const;
298 };
299 set<ScrubJob> sched_scrub_pg;
300
301 /// @returns the scrub_reg_stamp used for unregister the scrub job
302 utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval,
303 double pool_scrub_max_interval, bool must) {
304 ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
305 must);
11fdf7f2 306 std::lock_guard l(sched_scrub_lock);
7c673cae
FG
307 sched_scrub_pg.insert(scrub);
308 return scrub.sched_time;
309 }
310 void unreg_pg_scrub(spg_t pgid, utime_t t) {
11fdf7f2 311 std::lock_guard l(sched_scrub_lock);
7c673cae 312 size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
11fdf7f2 313 ceph_assert(removed);
7c673cae
FG
314 }
315 bool first_scrub_stamp(ScrubJob *out) {
11fdf7f2 316 std::lock_guard l(sched_scrub_lock);
7c673cae
FG
317 if (sched_scrub_pg.empty())
318 return false;
319 set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
320 *out = *iter;
321 return true;
322 }
323 bool next_scrub_stamp(const ScrubJob& next,
324 ScrubJob *out) {
11fdf7f2 325 std::lock_guard l(sched_scrub_lock);
7c673cae
FG
326 if (sched_scrub_pg.empty())
327 return false;
328 set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next);
329 if (iter == sched_scrub_pg.cend())
330 return false;
331 ++iter;
332 if (iter == sched_scrub_pg.cend())
333 return false;
334 *out = *iter;
335 return true;
336 }
337
338 void dumps_scrub(Formatter *f) {
11fdf7f2
TL
339 ceph_assert(f != nullptr);
340 std::lock_guard l(sched_scrub_lock);
7c673cae
FG
341
342 f->open_array_section("scrubs");
343 for (const auto &i: sched_scrub_pg) {
344 f->open_object_section("scrub");
345 f->dump_stream("pgid") << i.pgid;
346 f->dump_stream("sched_time") << i.sched_time;
347 f->dump_stream("deadline") << i.deadline;
494da23a 348 f->dump_bool("forced", i.sched_time == PG::Scrubber::scrub_must_stamp());
7c673cae
FG
349 f->close_section();
350 }
351 f->close_section();
352 }
353
eafe8130
TL
354 bool can_inc_scrubs();
355 bool inc_scrubs_local();
356 void dec_scrubs_local();
357 bool inc_scrubs_remote();
358 void dec_scrubs_remote();
359 void dump_scrub_reservations(Formatter *f);
7c673cae
FG
360
361 void reply_op_error(OpRequestRef op, int err);
9f95a23c
TL
362 void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv,
363 vector<pg_log_op_return_item_t> op_returns);
7c673cae
FG
364 void handle_misdirected_op(PG *pg, OpRequestRef op);
365
366
367private:
368 // -- agent shared state --
9f95a23c
TL
369 ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock");
370 ceph::condition_variable agent_cond;
7c673cae
FG
371 map<uint64_t, set<PGRef> > agent_queue;
372 set<PGRef>::iterator agent_queue_pos;
373 bool agent_valid_iterator;
374 int agent_ops;
375 int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
376 set<hobject_t> agent_oids;
377 bool agent_active;
378 struct AgentThread : public Thread {
379 OSDService *osd;
380 explicit AgentThread(OSDService *o) : osd(o) {}
381 void *entry() override {
382 osd->agent_entry();
383 return NULL;
384 }
385 } agent_thread;
386 bool agent_stop_flag;
9f95a23c 387 ceph::mutex agent_timer_lock = ceph::make_mutex("OSDService::agent_timer_lock");
7c673cae
FG
388 SafeTimer agent_timer;
389
390public:
391 void agent_entry();
392 void agent_stop();
393
394 void _enqueue(PG *pg, uint64_t priority) {
395 if (!agent_queue.empty() &&
396 agent_queue.rbegin()->first < priority)
397 agent_valid_iterator = false; // inserting higher-priority queue
398 set<PGRef>& nq = agent_queue[priority];
399 if (nq.empty())
9f95a23c 400 agent_cond.notify_all();
7c673cae
FG
401 nq.insert(pg);
402 }
403
404 void _dequeue(PG *pg, uint64_t old_priority) {
405 set<PGRef>& oq = agent_queue[old_priority];
406 set<PGRef>::iterator p = oq.find(pg);
11fdf7f2 407 ceph_assert(p != oq.end());
7c673cae
FG
408 if (p == agent_queue_pos)
409 ++agent_queue_pos;
410 oq.erase(p);
411 if (oq.empty()) {
412 if (agent_queue.rbegin()->first == old_priority)
413 agent_valid_iterator = false;
414 agent_queue.erase(old_priority);
415 }
416 }
417
418 /// enable agent for a pg
419 void agent_enable_pg(PG *pg, uint64_t priority) {
11fdf7f2 420 std::lock_guard l(agent_lock);
7c673cae
FG
421 _enqueue(pg, priority);
422 }
423
424 /// adjust priority for an enagled pg
425 void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
11fdf7f2
TL
426 std::lock_guard l(agent_lock);
427 ceph_assert(new_priority != old_priority);
7c673cae
FG
428 _enqueue(pg, new_priority);
429 _dequeue(pg, old_priority);
430 }
431
432 /// disable agent for a pg
433 void agent_disable_pg(PG *pg, uint64_t old_priority) {
11fdf7f2 434 std::lock_guard l(agent_lock);
7c673cae
FG
435 _dequeue(pg, old_priority);
436 }
437
438 /// note start of an async (evict) op
439 void agent_start_evict_op() {
11fdf7f2 440 std::lock_guard l(agent_lock);
7c673cae
FG
441 ++agent_ops;
442 }
443
444 /// note finish or cancellation of an async (evict) op
445 void agent_finish_evict_op() {
11fdf7f2
TL
446 std::lock_guard l(agent_lock);
447 ceph_assert(agent_ops > 0);
7c673cae 448 --agent_ops;
9f95a23c 449 agent_cond.notify_all();
7c673cae
FG
450 }
451
452 /// note start of an async (flush) op
453 void agent_start_op(const hobject_t& oid) {
11fdf7f2 454 std::lock_guard l(agent_lock);
7c673cae 455 ++agent_ops;
11fdf7f2 456 ceph_assert(agent_oids.count(oid) == 0);
7c673cae
FG
457 agent_oids.insert(oid);
458 }
459
460 /// note finish or cancellation of an async (flush) op
461 void agent_finish_op(const hobject_t& oid) {
11fdf7f2
TL
462 std::lock_guard l(agent_lock);
463 ceph_assert(agent_ops > 0);
7c673cae 464 --agent_ops;
11fdf7f2 465 ceph_assert(agent_oids.count(oid) == 1);
7c673cae 466 agent_oids.erase(oid);
9f95a23c 467 agent_cond.notify_all();
7c673cae
FG
468 }
469
470 /// check if we are operating on an object
471 bool agent_is_active_oid(const hobject_t& oid) {
11fdf7f2 472 std::lock_guard l(agent_lock);
7c673cae
FG
473 return agent_oids.count(oid);
474 }
475
476 /// get count of active agent ops
477 int agent_get_num_ops() {
11fdf7f2 478 std::lock_guard l(agent_lock);
7c673cae
FG
479 return agent_ops;
480 }
481
482 void agent_inc_high_count() {
11fdf7f2 483 std::lock_guard l(agent_lock);
7c673cae
FG
484 flush_mode_high_count ++;
485 }
486
487 void agent_dec_high_count() {
11fdf7f2 488 std::lock_guard l(agent_lock);
7c673cae
FG
489 flush_mode_high_count --;
490 }
491
492private:
493 /// throttle promotion attempts
11fdf7f2 494 std::atomic<unsigned int> promote_probability_millis{1000}; ///< probability thousands. one word.
7c673cae
FG
495 PromoteCounter promote_counter;
496 utime_t last_recalibrate;
497 unsigned long promote_max_objects, promote_max_bytes;
498
499public:
500 bool promote_throttle() {
501 // NOTE: lockless! we rely on the probability being a single word.
502 promote_counter.attempt();
503 if ((unsigned)rand() % 1000 > promote_probability_millis)
504 return true; // yes throttle (no promote)
505 if (promote_max_objects &&
506 promote_counter.objects > promote_max_objects)
507 return true; // yes throttle
508 if (promote_max_bytes &&
509 promote_counter.bytes > promote_max_bytes)
510 return true; // yes throttle
511 return false; // no throttle (promote)
512 }
513 void promote_finish(uint64_t bytes) {
514 promote_counter.finish(bytes);
515 }
516 void promote_throttle_recalibrate();
9f95a23c
TL
517 unsigned get_num_shards() const {
518 return m_objecter_finishers;
519 }
520 Finisher* get_objecter_finisher(int shard) {
521 return objecter_finishers[shard].get();
522 }
7c673cae
FG
523
524 // -- Objecter, for tiering reads/writes from/to other OSDs --
9f95a23c 525 std::unique_ptr<Objecter> objecter;
11fdf7f2 526 int m_objecter_finishers;
9f95a23c 527 std::vector<std::unique_ptr<Finisher>> objecter_finishers;
7c673cae
FG
528
529 // -- Watch --
9f95a23c 530 ceph::mutex watch_lock = ceph::make_mutex("OSDService::watch_lock");
7c673cae
FG
531 SafeTimer watch_timer;
532 uint64_t next_notif_id;
533 uint64_t get_next_id(epoch_t cur_epoch) {
11fdf7f2 534 std::lock_guard l(watch_lock);
7c673cae
FG
535 return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
536 }
537
538 // -- Recovery/Backfill Request Scheduling --
9f95a23c 539 ceph::mutex recovery_request_lock = ceph::make_mutex("OSDService::recovery_request_lock");
7c673cae
FG
540 SafeTimer recovery_request_timer;
541
31f18b77
FG
542 // For async recovery sleep
543 bool recovery_needs_sleep = true;
9f95a23c 544 ceph::real_clock::time_point recovery_schedule_time;
31f18b77 545
11fdf7f2 546 // For recovery & scrub & snap
9f95a23c 547 ceph::mutex sleep_lock = ceph::make_mutex("OSDService::sleep_lock");
11fdf7f2 548 SafeTimer sleep_timer;
31f18b77 549
7c673cae
FG
550 // -- tids --
551 // for ops i issue
11fdf7f2 552 std::atomic<unsigned int> last_tid{0};
7c673cae
FG
553 ceph_tid_t get_tid() {
554 return (ceph_tid_t)last_tid++;
555 }
556
557 // -- backfill_reservation --
558 Finisher reserver_finisher;
559 AsyncReserver<spg_t> local_reserver;
560 AsyncReserver<spg_t> remote_reserver;
561
11fdf7f2 562 // -- pg merge --
9f95a23c 563 ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
11fdf7f2
TL
564 map<pg_t,eversion_t> ready_to_merge_source; // pg -> version
565 map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target; // pg -> (version,les,lec)
566 set<pg_t> not_ready_to_merge_source;
567 map<pg_t,pg_t> not_ready_to_merge_target;
568 set<pg_t> sent_ready_to_merge_source;
569
570 void set_ready_to_merge_source(PG *pg,
571 eversion_t version);
572 void set_ready_to_merge_target(PG *pg,
573 eversion_t version,
574 epoch_t last_epoch_started,
575 epoch_t last_epoch_clean);
576 void set_not_ready_to_merge_source(pg_t source);
577 void set_not_ready_to_merge_target(pg_t target, pg_t source);
578 void clear_ready_to_merge(PG *pg);
579 void send_ready_to_merge();
580 void _send_ready_to_merge();
581 void clear_sent_ready_to_merge();
9f95a23c 582 void prune_sent_ready_to_merge(const OSDMapRef& osdmap);
11fdf7f2 583
7c673cae
FG
584 // -- pg_temp --
585private:
9f95a23c 586 ceph::mutex pg_temp_lock = ceph::make_mutex("OSDService::pg_temp_lock");
94b18763 587 struct pg_temp_t {
94b18763
FG
588 vector<int> acting;
589 bool forced = false;
590 };
591 map<pg_t, pg_temp_t> pg_temp_wanted;
592 map<pg_t, pg_temp_t> pg_temp_pending;
7c673cae 593 void _sent_pg_temp();
94b18763 594 friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
7c673cae 595public:
94b18763
FG
596 void queue_want_pg_temp(pg_t pgid, const vector<int>& want,
597 bool forced = false);
7c673cae
FG
598 void remove_want_pg_temp(pg_t pgid);
599 void requeue_pg_temp();
600 void send_pg_temp();
601
11fdf7f2
TL
602 ceph::mutex pg_created_lock = ceph::make_mutex("OSDService::pg_created_lock");
603 set<pg_t> pg_created;
7c673cae 604 void send_pg_created(pg_t pgid);
11fdf7f2
TL
605 void prune_pg_created();
606 void send_pg_created();
31f18b77 607
7c673cae 608 AsyncReserver<spg_t> snap_reserver;
11fdf7f2 609 void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c);
7c673cae 610 void queue_for_snap_trim(PG *pg);
11fdf7f2
TL
611 void queue_for_scrub(PG *pg, bool with_high_priority);
612 void queue_for_pg_delete(spg_t pgid, epoch_t e);
613 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
7c673cae
FG
614
615private:
616 // -- pg recovery and associated throttling --
9f95a23c 617 ceph::mutex recovery_lock = ceph::make_mutex("OSDService::recovery_lock");
7c673cae
FG
618 list<pair<epoch_t, PGRef> > awaiting_throttle;
619
620 utime_t defer_recovery_until;
621 uint64_t recovery_ops_active;
622 uint64_t recovery_ops_reserved;
623 bool recovery_paused;
624#ifdef DEBUG_RECOVERY_OIDS
625 map<spg_t, set<hobject_t> > recovery_oids;
626#endif
627 bool _recover_now(uint64_t *available_pushes);
628 void _maybe_queue_recovery();
629 void _queue_for_recovery(
11fdf7f2 630 pair<epoch_t, PGRef> p, uint64_t reserved_pushes);
7c673cae
FG
631public:
632 void start_recovery_op(PG *pg, const hobject_t& soid);
633 void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
634 bool is_recovery_active();
11fdf7f2 635 void release_reserved_pushes(uint64_t pushes);
7c673cae
FG
636 void defer_recovery(float defer_for) {
637 defer_recovery_until = ceph_clock_now();
638 defer_recovery_until += defer_for;
639 }
640 void pause_recovery() {
11fdf7f2 641 std::lock_guard l(recovery_lock);
7c673cae
FG
642 recovery_paused = true;
643 }
644 bool recovery_is_paused() {
11fdf7f2 645 std::lock_guard l(recovery_lock);
7c673cae
FG
646 return recovery_paused;
647 }
648 void unpause_recovery() {
11fdf7f2 649 std::lock_guard l(recovery_lock);
7c673cae
FG
650 recovery_paused = false;
651 _maybe_queue_recovery();
652 }
653 void kick_recovery_queue() {
11fdf7f2 654 std::lock_guard l(recovery_lock);
7c673cae
FG
655 _maybe_queue_recovery();
656 }
657 void clear_queued_recovery(PG *pg) {
11fdf7f2
TL
658 std::lock_guard l(recovery_lock);
659 awaiting_throttle.remove_if(
660 [pg](decltype(awaiting_throttle)::const_reference awaiting ) {
661 return awaiting.second.get() == pg;
662 });
7c673cae 663 }
9f95a23c
TL
664
665 unsigned get_target_pg_log_entries() const;
666
7c673cae 667 // delayed pg activation
c07f9fc5 668 void queue_for_recovery(PG *pg) {
11fdf7f2 669 std::lock_guard l(recovery_lock);
c07f9fc5 670
11fdf7f2 671 if (pg->is_forced_recovery_or_backfill()) {
7c673cae
FG
672 awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
673 } else {
674 awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
675 }
676 _maybe_queue_recovery();
677 }
31f18b77 678 void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
11fdf7f2 679 std::lock_guard l(recovery_lock);
31f18b77
FG
680 _queue_for_recovery(make_pair(queued, pg), reserved_pushes);
681 }
7c673cae 682
9f95a23c
TL
683 void queue_check_readable(spg_t spgid,
684 epoch_t lpr,
685 ceph::signedspan delay = ceph::signedspan::zero());
686
7c673cae 687 // osd map cache (past osd maps)
9f95a23c 688 ceph::mutex map_cache_lock = ceph::make_mutex("OSDService::map_cache_lock");
7c673cae
FG
689 SharedLRU<epoch_t, const OSDMap> map_cache;
690 SimpleLRU<epoch_t, bufferlist> map_bl_cache;
691 SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
692
693 OSDMapRef try_get_map(epoch_t e);
694 OSDMapRef get_map(epoch_t e) {
695 OSDMapRef ret(try_get_map(e));
11fdf7f2 696 ceph_assert(ret);
7c673cae
FG
697 return ret;
698 }
699 OSDMapRef add_map(OSDMap *o) {
11fdf7f2 700 std::lock_guard l(map_cache_lock);
7c673cae
FG
701 return _add_map(o);
702 }
703 OSDMapRef _add_map(OSDMap *o);
704
7c673cae
FG
705 void _add_map_bl(epoch_t e, bufferlist& bl);
706 bool get_map_bl(epoch_t e, bufferlist& bl) {
11fdf7f2 707 std::lock_guard l(map_cache_lock);
7c673cae
FG
708 return _get_map_bl(e, bl);
709 }
710 bool _get_map_bl(epoch_t e, bufferlist& bl);
711
7c673cae
FG
712 void _add_map_inc_bl(epoch_t e, bufferlist& bl);
713 bool get_inc_map_bl(epoch_t e, bufferlist& bl);
714
11fdf7f2
TL
715 /// identify split child pgids over a osdmap interval
716 void identify_splits_and_merges(
717 OSDMapRef old_map,
718 OSDMapRef new_map,
719 spg_t pgid,
720 set<pair<spg_t,epoch_t>> *new_children,
721 set<pair<spg_t,epoch_t>> *merge_pgs);
722
723 void need_heartbeat_peer_update();
7c673cae
FG
724
725 void init();
726 void final_init();
727 void start_shutdown();
31f18b77 728 void shutdown_reserver();
7c673cae
FG
729 void shutdown();
730
7c673cae 731 // -- stats --
9f95a23c 732 ceph::mutex stat_lock = ceph::make_mutex("OSDService::stat_lock");
7c673cae 733 osd_stat_t osd_stat;
31f18b77 734 uint32_t seq = 0;
7c673cae 735
11fdf7f2
TL
736 void set_statfs(const struct store_statfs_t &stbuf,
737 osd_alert_list_t& alerts);
738 osd_stat_t set_osd_stat(vector<int>& hb_peers, int num_pgs);
739 void inc_osd_stat_repaired(void);
740 float compute_adjusted_ratio(osd_stat_t new_stat, float *pratio, uint64_t adjust_used = 0);
7c673cae 741 osd_stat_t get_osd_stat() {
11fdf7f2 742 std::lock_guard l(stat_lock);
31f18b77
FG
743 ++seq;
744 osd_stat.up_from = up_epoch;
745 osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
7c673cae
FG
746 return osd_stat;
747 }
31f18b77 748 uint64_t get_osd_stat_seq() {
11fdf7f2 749 std::lock_guard l(stat_lock);
31f18b77
FG
750 return osd_stat.seq;
751 }
eafe8130
TL
752 void get_hb_pingtime(map<int, osd_stat_t::Interfaces> *pp)
753 {
754 std::lock_guard l(stat_lock);
755 *pp = osd_stat.hb_pingtime;
756 return;
757 }
7c673cae
FG
758
759 // -- OSD Full Status --
760private:
761 friend TestOpsSocketHook;
9f95a23c 762 mutable ceph::mutex full_status_lock = ceph::make_mutex("OSDService::full_status_lock");
7c673cae
FG
763 enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
764 const char *get_full_state_name(s_names s) const {
765 switch (s) {
766 case NONE: return "none";
767 case NEARFULL: return "nearfull";
768 case BACKFILLFULL: return "backfillfull";
769 case FULL: return "full";
770 case FAILSAFE: return "failsafe";
771 default: return "???";
772 }
773 }
774 s_names get_full_state(string type) const {
775 if (type == "none")
776 return NONE;
777 else if (type == "failsafe")
778 return FAILSAFE;
779 else if (type == "full")
780 return FULL;
781 else if (type == "backfillfull")
782 return BACKFILLFULL;
783 else if (type == "nearfull")
784 return NEARFULL;
785 else
786 return INVALID;
787 }
11fdf7f2 788 double cur_ratio, physical_ratio; ///< current utilization
7c673cae
FG
789 mutable int64_t injectfull = 0;
790 s_names injectfull_state = NONE;
791 float get_failsafe_full_ratio();
11fdf7f2
TL
792 bool _check_inject_full(DoutPrefixProvider *dpp, s_names type) const;
793 bool _check_full(DoutPrefixProvider *dpp, s_names type) const;
7c673cae 794public:
11fdf7f2
TL
795 void check_full_status(float ratio, float pratio);
796 s_names recalc_full_state(float ratio, float pratio, string &inject);
797 bool _tentative_full(DoutPrefixProvider *dpp, s_names type, uint64_t adjust_used, osd_stat_t);
798 bool check_failsafe_full(DoutPrefixProvider *dpp) const;
799 bool check_full(DoutPrefixProvider *dpp) const;
800 bool tentative_backfill_full(DoutPrefixProvider *dpp, uint64_t adjust_used, osd_stat_t);
801 bool check_backfill_full(DoutPrefixProvider *dpp) const;
802 bool check_nearfull(DoutPrefixProvider *dpp) const;
7c673cae
FG
803 bool is_failsafe_full() const;
804 bool is_full() const;
805 bool is_backfillfull() const;
806 bool is_nearfull() const;
807 bool need_fullness_update(); ///< osdmap state needs update
808 void set_injectfull(s_names type, int64_t count);
7c673cae
FG
809
810
811 // -- epochs --
812private:
9f95a23c
TL
813 // protects access to boot_epoch, up_epoch, bind_epoch
814 mutable ceph::mutex epoch_lock = ceph::make_mutex("OSDService::epoch_lock");
7c673cae
FG
815 epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
816 epoch_t up_epoch; // _most_recent_ epoch we were marked up
817 epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
818public:
819 /**
820 * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params
821 * can be NULL if you don't care about them.
822 */
823 void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
824 epoch_t *_bind_epoch) const;
825 /**
826 * Set the boot, up, and bind epochs. Any NULL params will not be set.
827 */
828 void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
829 const epoch_t *_bind_epoch);
830 epoch_t get_boot_epoch() const {
831 epoch_t ret;
832 retrieve_epochs(&ret, NULL, NULL);
833 return ret;
834 }
835 epoch_t get_up_epoch() const {
836 epoch_t ret;
837 retrieve_epochs(NULL, &ret, NULL);
838 return ret;
839 }
840 epoch_t get_bind_epoch() const {
841 epoch_t ret;
842 retrieve_epochs(NULL, NULL, &ret);
843 return ret;
844 }
845
181888fb
FG
846 void request_osdmap_update(epoch_t e);
847
9f95a23c
TL
848 // -- heartbeats --
849 ceph::mutex hb_stamp_lock = ceph::make_mutex("OSDServce::hb_stamp_lock");
850
851 /// osd -> heartbeat stamps
852 vector<HeartbeatStampsRef> hb_stamps;
853
854 /// get or create a ref for a peer's HeartbeatStamps
855 HeartbeatStampsRef get_hb_stamps(unsigned osd);
856
857
858 // Timer for readable leases
859 ceph::timer<ceph::mono_clock> mono_timer = ceph::timer<ceph::mono_clock>{ceph::construct_suspended};
860
861 void queue_renew_lease(epoch_t epoch, spg_t spgid);
862
7c673cae 863 // -- stopping --
9f95a23c
TL
864 ceph::mutex is_stopping_lock = ceph::make_mutex("OSDService::is_stopping_lock");
865 ceph::condition_variable is_stopping_cond;
7c673cae
FG
866 enum {
867 NOT_STOPPING,
868 PREPARING_TO_STOP,
869 STOPPING };
11fdf7f2
TL
870 std::atomic<int> state{NOT_STOPPING};
871 int get_state() const {
7c673cae
FG
872 return state;
873 }
874 void set_state(int s) {
875 state = s;
876 }
877 bool is_stopping() const {
878 return state == STOPPING;
879 }
880 bool is_preparing_to_stop() const {
881 return state == PREPARING_TO_STOP;
882 }
883 bool prepare_to_stop();
884 void got_stop_ack();
885
886
887#ifdef PG_DEBUG_REFS
9f95a23c 888 ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock");
7c673cae
FG
889 map<spg_t, int> pgid_tracker;
890 map<spg_t, PG*> live_pgs;
31f18b77
FG
891 void add_pgid(spg_t pgid, PG *pg);
892 void remove_pgid(spg_t pgid, PG *pg);
893 void dump_live_pgids();
7c673cae
FG
894#endif
895
896 explicit OSDService(OSD *osd);
9f95a23c 897 ~OSDService() = default;
7c673cae
FG
898};
899
11fdf7f2
TL
900/*
901
902 Each PG slot includes queues for events that are processing and/or waiting
903 for a PG to be materialized in the slot.
904
905 These are the constraints:
906
907 - client ops must remained ordered by client, regardless of map epoch
908 - peering messages/events from peers must remain ordered by peer
909 - peering messages and client ops need not be ordered relative to each other
910
911 - some peering events can create a pg (e.g., notify)
912 - the query peering event can proceed when a PG doesn't exist
913
914 Implementation notes:
915
916 - everybody waits for split. If the OSD has the parent PG it will instantiate
917 the PGSlot early and mark it waiting_for_split. Everything will wait until
918 the parent is able to commit the split operation and the child PG's are
919 materialized in the child slots.
920
921 - every event has an epoch property and will wait for the OSDShard to catch
922 up to that epoch. For example, if we get a peering event from a future
923 epoch, the event will wait in the slot until the local OSD has caught up.
924 (We should be judicious in specifying the required epoch [by, e.g., setting
925 it to the same_interval_since epoch] so that we don't wait for epochs that
926 don't affect the given PG.)
927
928 - we maintain two separate wait lists, *waiting* and *waiting_peering*. The
9f95a23c 929 OpSchedulerItem has an is_peering() bool to determine which we use. Waiting
11fdf7f2
TL
930 peering events are queued up by epoch required.
931
932 - when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or
933 materialized the PG), we wake *all* waiting items. (This could be optimized,
934 probably, but we don't bother.) We always requeue peering items ahead of
935 client ops.
936
937 - some peering events are marked !peering_requires_pg (PGQuery). if we do
938 not have a PG these are processed immediately (under the shard lock).
939
940 - we do not have a PG present, we check if the slot maps to the current host.
941 if so, we either queue the item and wait for the PG to materialize, or
942 (if the event is a pg creating event like PGNotify), we materialize the PG.
943
944 - when we advance the osdmap on the OSDShard, we scan pg slots and
945 discard any slots with no pg (and not waiting_for_split) that no
946 longer map to the current host.
947
948 */
949
950struct OSDShardPGSlot {
9f95a23c 951 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
11fdf7f2 952 PGRef pg; ///< pg reference
9f95a23c 953 deque<OpSchedulerItem> to_process; ///< order items for this slot
11fdf7f2
TL
954 int num_running = 0; ///< _process threads doing pg lookup/lock
955
9f95a23c 956 deque<OpSchedulerItem> waiting; ///< waiting for pg (or map + pg)
11fdf7f2
TL
957
958 /// waiting for map (peering evt)
9f95a23c 959 map<epoch_t,deque<OpSchedulerItem>> waiting_peering;
11fdf7f2
TL
960
961 /// incremented by wake_pg_waiters; indicates racing _process threads
962 /// should bail out (their op has been requeued)
963 uint64_t requeue_seq = 0;
964
965 /// waiting for split child to materialize in these epoch(s)
966 set<epoch_t> waiting_for_split;
967
968 epoch_t epoch = 0;
969 boost::intrusive::set_member_hook<> pg_epoch_item;
970
971 /// waiting for a merge (source or target) by this epoch
972 epoch_t waiting_for_merge_epoch = 0;
973};
974
975struct OSDShard {
976 const unsigned shard_id;
977 CephContext *cct;
978 OSD *osd;
979
980 string shard_name;
981
982 string sdata_wait_lock_name;
983 ceph::mutex sdata_wait_lock;
984 ceph::condition_variable sdata_cond;
985
11fdf7f2
TL
986 ceph::mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock
987 OSDMapRef shard_osdmap;
988
989 OSDMapRef get_osdmap() {
990 std::lock_guard l(osdmap_lock);
991 return shard_osdmap;
992 }
993
994 string shard_lock_name;
995 ceph::mutex shard_lock; ///< protects remaining members below
996
997 /// map of slots for each spg_t. maintains ordering of items dequeued
9f95a23c 998 /// from scheduler while _process thread drops shard lock to acquire the
11fdf7f2
TL
999 /// pg lock. stale slots are removed by consume_map.
1000 unordered_map<spg_t,unique_ptr<OSDShardPGSlot>> pg_slots;
1001
1002 struct pg_slot_compare_by_epoch {
1003 bool operator()(const OSDShardPGSlot& l, const OSDShardPGSlot& r) const {
1004 return l.epoch < r.epoch;
1005 }
1006 };
1007
1008 /// maintain an ordering of pg slots by pg epoch
1009 boost::intrusive::multiset<
1010 OSDShardPGSlot,
1011 boost::intrusive::member_hook<
1012 OSDShardPGSlot,
1013 boost::intrusive::set_member_hook<>,
1014 &OSDShardPGSlot::pg_epoch_item>,
1015 boost::intrusive::compare<pg_slot_compare_by_epoch>> pg_slots_by_epoch;
1016 int waiting_for_min_pg_epoch = 0;
1017 ceph::condition_variable min_pg_epoch_cond;
1018
1019 /// priority queue
9f95a23c 1020 ceph::osd::scheduler::OpSchedulerRef scheduler;
11fdf7f2
TL
1021
1022 bool stop_waiting = false;
1023
1024 ContextQueue context_queue;
1025
11fdf7f2
TL
1026 void _attach_pg(OSDShardPGSlot *slot, PG *pg);
1027 void _detach_pg(OSDShardPGSlot *slot);
1028
1029 void update_pg_epoch(OSDShardPGSlot *slot, epoch_t epoch);
1030 epoch_t get_min_pg_epoch();
1031 void wait_min_pg_epoch(epoch_t need);
1032
1033 /// return newest epoch we are waiting for
1034 epoch_t get_max_waiting_epoch();
1035
1036 /// push osdmap into shard
1037 void consume_map(
9f95a23c 1038 const OSDMapRef& osdmap,
11fdf7f2
TL
1039 unsigned *pushes_to_free);
1040
1041 void _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
1042
1043 void identify_splits_and_merges(
1044 const OSDMapRef& as_of_osdmap,
1045 set<pair<spg_t,epoch_t>> *split_children,
1046 set<pair<spg_t,epoch_t>> *merge_pgs);
1047 void _prime_splits(set<pair<spg_t,epoch_t>> *pgids);
1048 void prime_splits(const OSDMapRef& as_of_osdmap,
1049 set<pair<spg_t,epoch_t>> *pgids);
1050 void prime_merges(const OSDMapRef& as_of_osdmap,
1051 set<pair<spg_t,epoch_t>> *merge_pgs);
1052 void register_and_wake_split_child(PG *pg);
1053 void unprime_split_children(spg_t parent, unsigned old_pg_num);
1054
1055 OSDShard(
1056 int id,
1057 CephContext *cct,
9f95a23c 1058 OSD *osd);
11fdf7f2
TL
1059};
1060
7c673cae
FG
1061class OSD : public Dispatcher,
1062 public md_config_obs_t {
9f95a23c
TL
1063 using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
1064
7c673cae 1065 /** OSD **/
9f95a23c
TL
1066 // global lock
1067 ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock");
7c673cae
FG
1068 SafeTimer tick_timer; // safe timer (osd_lock)
1069
1070 // Tick timer for those stuff that do not need osd_lock
9f95a23c 1071 ceph::mutex tick_timer_lock = ceph::make_mutex("OSD::tick_timer_lock");
7c673cae 1072 SafeTimer tick_timer_without_osd_lock;
11fdf7f2
TL
1073 std::string gss_ktfile_client{};
1074
7c673cae
FG
1075public:
1076 // config observer bits
1077 const char** get_tracked_conf_keys() const override;
11fdf7f2 1078 void handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
1079 const std::set <std::string> &changed) override;
1080 void update_log_config();
1081 void check_config();
1082
1083protected:
1084
91327a77
AA
1085 const double OSD_TICK_INTERVAL = { 1.0 };
1086 double get_tick_interval() const;
7c673cae 1087
7c673cae
FG
1088 Messenger *cluster_messenger;
1089 Messenger *client_messenger;
1090 Messenger *objecter_messenger;
1091 MonClient *monc; // check the "monc helpers" list before accessing directly
1092 MgrClient mgrc;
1093 PerfCounters *logger;
1094 PerfCounters *recoverystate_perf;
1095 ObjectStore *store;
1096#ifdef HAVE_LIBFUSE
1097 FuseStore *fuse_store = nullptr;
1098#endif
1099 LogClient log_client;
1100 LogChannelRef clog;
1101
1102 int whoami;
1103 std::string dev_path, journal_path;
1104
9f95a23c 1105 ceph_release_t last_require_osd_release{ceph_release_t::unknown};
11fdf7f2
TL
1106
1107 int numa_node = -1;
1108 size_t numa_cpu_set_size = 0;
1109 cpu_set_t numa_cpu_set;
1110
31f18b77 1111 bool store_is_rotational = true;
d2e6a577 1112 bool journal_is_rotational = true;
31f18b77 1113
7c673cae
FG
1114 ZTracer::Endpoint trace_endpoint;
1115 void create_logger();
1116 void create_recoverystate_perf();
1117 void tick();
1118 void tick_without_osd_lock();
1119 void _dispatch(Message *m);
1120 void dispatch_op(OpRequestRef op);
1121
11fdf7f2 1122 void check_osdmap_features();
7c673cae
FG
1123
1124 // asok
1125 friend class OSDSocketHook;
1126 class OSDSocketHook *asok_hook;
9f95a23c
TL
1127 void asok_command(
1128 std::string_view prefix,
1129 const cmdmap_t& cmdmap,
1130 Formatter *f,
1131 const bufferlist& inbl,
1132 std::function<void(int,const std::string&,bufferlist&)> on_finish);
7c673cae
FG
1133
1134public:
7c673cae
FG
1135 int get_nodeid() { return whoami; }
1136
1137 static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
1138 char foo[20];
1139 snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
1140 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1141 }
1142 static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
1143 char foo[22];
1144 snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch);
1145 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1146 }
1147
1148 static ghobject_t make_snapmapper_oid() {
1149 return ghobject_t(hobject_t(
1150 sobject_t(
1151 object_t("snapmapper"),
1152 0)));
1153 }
9f95a23c
TL
1154 static ghobject_t make_purged_snaps_oid() {
1155 return ghobject_t(hobject_t(
1156 sobject_t(
1157 object_t("purged_snaps"),
1158 0)));
1159 }
7c673cae
FG
1160
1161 static ghobject_t make_pg_log_oid(spg_t pg) {
1162 stringstream ss;
1163 ss << "pglog_" << pg;
1164 string s;
1165 getline(ss, s);
1166 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1167 }
1168
1169 static ghobject_t make_pg_biginfo_oid(spg_t pg) {
1170 stringstream ss;
1171 ss << "pginfo_" << pg;
1172 string s;
1173 getline(ss, s);
1174 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1175 }
1176 static ghobject_t make_infos_oid() {
1177 hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
1178 return ghobject_t(oid);
1179 }
11fdf7f2
TL
1180
1181 static ghobject_t make_final_pool_info_oid(int64_t pool) {
1182 return ghobject_t(
1183 hobject_t(
1184 sobject_t(
1185 object_t(string("final_pool_") + stringify(pool)),
1186 CEPH_NOSNAP)));
1187 }
1188
1189 static ghobject_t make_pg_num_history_oid() {
1190 return ghobject_t(hobject_t(sobject_t("pg_num_history", CEPH_NOSNAP)));
1191 }
1192
7c673cae
FG
1193 static void recursive_remove_collection(CephContext* cct,
1194 ObjectStore *store,
1195 spg_t pgid,
1196 coll_t tmp);
1197
1198 /**
1199 * get_osd_initial_compat_set()
1200 *
1201 * Get the initial feature set for this OSD. Features
1202 * here are automatically upgraded.
1203 *
1204 * Return value: Initial osd CompatSet
1205 */
1206 static CompatSet get_osd_initial_compat_set();
1207
1208 /**
1209 * get_osd_compat_set()
1210 *
1211 * Get all features supported by this OSD
1212 *
1213 * Return value: CompatSet of all supported features
1214 */
1215 static CompatSet get_osd_compat_set();
1216
1217
1218private:
1219 class C_Tick;
1220 class C_Tick_WithoutOSDLock;
1221
11fdf7f2
TL
1222 // -- config settings --
1223 float m_osd_pg_epoch_max_lag_factor;
1224
7c673cae
FG
1225 // -- superblock --
1226 OSDSuperblock superblock;
1227
1228 void write_superblock();
1229 void write_superblock(ObjectStore::Transaction& t);
1230 int read_superblock();
1231
1232 void clear_temp_objects();
1233
1234 CompatSet osd_compat;
1235
1236 // -- state --
1237public:
1238 typedef enum {
1239 STATE_INITIALIZING = 1,
1240 STATE_PREBOOT,
1241 STATE_BOOTING,
1242 STATE_ACTIVE,
1243 STATE_STOPPING,
1244 STATE_WAITING_FOR_HEALTHY
1245 } osd_state_t;
1246
1247 static const char *get_state_name(int s) {
1248 switch (s) {
1249 case STATE_INITIALIZING: return "initializing";
1250 case STATE_PREBOOT: return "preboot";
1251 case STATE_BOOTING: return "booting";
1252 case STATE_ACTIVE: return "active";
1253 case STATE_STOPPING: return "stopping";
1254 case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy";
1255 default: return "???";
1256 }
1257 }
1258
1259private:
11fdf7f2 1260 std::atomic<int> state{STATE_INITIALIZING};
7c673cae
FG
1261
1262public:
1263 int get_state() const {
1264 return state;
1265 }
1266 void set_state(int s) {
1267 state = s;
1268 }
1269 bool is_initializing() const {
1270 return state == STATE_INITIALIZING;
1271 }
1272 bool is_preboot() const {
1273 return state == STATE_PREBOOT;
1274 }
1275 bool is_booting() const {
1276 return state == STATE_BOOTING;
1277 }
1278 bool is_active() const {
1279 return state == STATE_ACTIVE;
1280 }
1281 bool is_stopping() const {
1282 return state == STATE_STOPPING;
1283 }
1284 bool is_waiting_for_healthy() const {
1285 return state == STATE_WAITING_FOR_HEALTHY;
1286 }
1287
1288private:
1289
7c673cae 1290 ShardedThreadPool osd_op_tp;
7c673cae 1291
7c673cae
FG
1292 void get_latest_osdmap();
1293
1294 // -- sessions --
1295private:
9f95a23c 1296 void dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap);
7c673cae 1297
9f95a23c
TL
1298 ceph::mutex session_waiting_lock = ceph::make_mutex("OSD::session_waiting_lock");
1299 set<ceph::ref_t<Session>> session_waiting_for_map;
7c673cae
FG
1300
1301 /// Caller assumes refs for included Sessions
9f95a23c 1302 void get_sessions_waiting_for_map(set<ceph::ref_t<Session>> *out) {
11fdf7f2 1303 std::lock_guard l(session_waiting_lock);
7c673cae
FG
1304 out->swap(session_waiting_for_map);
1305 }
9f95a23c 1306 void register_session_waiting_on_map(const ceph::ref_t<Session>& session) {
11fdf7f2
TL
1307 std::lock_guard l(session_waiting_lock);
1308 session_waiting_for_map.insert(session);
7c673cae 1309 }
9f95a23c 1310 void clear_session_waiting_on_map(const ceph::ref_t<Session>& session) {
11fdf7f2
TL
1311 std::lock_guard l(session_waiting_lock);
1312 session_waiting_for_map.erase(session);
7c673cae
FG
1313 }
1314 void dispatch_sessions_waiting_on_map() {
9f95a23c 1315 set<ceph::ref_t<Session>> sessions_to_check;
7c673cae 1316 get_sessions_waiting_for_map(&sessions_to_check);
11fdf7f2 1317 for (auto i = sessions_to_check.begin();
7c673cae
FG
1318 i != sessions_to_check.end();
1319 sessions_to_check.erase(i++)) {
11fdf7f2 1320 std::lock_guard l{(*i)->session_dispatch_lock};
9f95a23c 1321 dispatch_session_waiting(*i, get_osdmap());
7c673cae
FG
1322 }
1323 }
9f95a23c 1324 void session_handle_reset(const ceph::ref_t<Session>& session) {
11fdf7f2 1325 std::lock_guard l(session->session_dispatch_lock);
7c673cae
FG
1326 clear_session_waiting_on_map(session);
1327
1328 session->clear_backoffs();
1329
1330 /* Messages have connection refs, we need to clear the
1331 * connection->session->message->connection
1332 * cycles which result.
1333 * Bug #12338
1334 */
1335 session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
1336 }
1337
1338private:
1339 /**
1340 * @defgroup monc helpers
1341 * @{
1342 * Right now we only have the one
1343 */
1344
1345 /**
1346 * Ask the Monitors for a sequence of OSDMaps.
1347 *
1348 * @param epoch The epoch to start with when replying
1349 * @param force_request True if this request forces a new subscription to
1350 * the monitors; false if an outstanding request that encompasses it is
1351 * sufficient.
1352 */
1353 void osdmap_subscribe(version_t epoch, bool force_request);
1354 /** @} monc helpers */
1355
9f95a23c 1356 ceph::mutex osdmap_subscribe_lock = ceph::make_mutex("OSD::osdmap_subscribe_lock");
181888fb
FG
1357 epoch_t latest_subscribed_epoch{0};
1358
7c673cae
FG
1359 // -- heartbeat --
1360 /// information about a heartbeat peer
1361 struct HeartbeatInfo {
1362 int peer; ///< peer
1363 ConnectionRef con_front; ///< peer connection (front)
1364 ConnectionRef con_back; ///< peer connection (back)
1365 utime_t first_tx; ///< time we sent our first ping request
1366 utime_t last_tx; ///< last time we sent a ping request
1367 utime_t last_rx_front; ///< last time we got a ping reply on the front side
1368 utime_t last_rx_back; ///< last time we got a ping reply on the back side
1369 epoch_t epoch; ///< most recent epoch we wanted this peer
11fdf7f2
TL
1370 /// number of connections we send and receive heartbeat pings/replies
1371 static constexpr int HEARTBEAT_MAX_CONN = 2;
1372 /// history of inflight pings, arranging by timestamp we sent
1373 /// send time -> deadline -> remaining replies
1374 map<utime_t, pair<utime_t, int>> ping_history;
1375
eafe8130
TL
1376 utime_t hb_interval_start;
1377 uint32_t hb_average_count = 0;
1378 uint32_t hb_index = 0;
1379
1380 uint32_t hb_total_back = 0;
1381 uint32_t hb_min_back = UINT_MAX;
1382 uint32_t hb_max_back = 0;
1383 vector<uint32_t> hb_back_pingtime;
1384 vector<uint32_t> hb_back_min;
1385 vector<uint32_t> hb_back_max;
1386
1387 uint32_t hb_total_front = 0;
1388 uint32_t hb_min_front = UINT_MAX;
1389 uint32_t hb_max_front = 0;
1390 vector<uint32_t> hb_front_pingtime;
1391 vector<uint32_t> hb_front_min;
1392 vector<uint32_t> hb_front_max;
1393
494da23a
TL
1394 bool is_stale(utime_t stale) {
1395 if (ping_history.empty()) {
1396 return false;
1397 }
1398 utime_t oldest_deadline = ping_history.begin()->second.first;
1399 return oldest_deadline <= stale;
1400 }
1401
11fdf7f2
TL
1402 bool is_unhealthy(utime_t now) {
1403 if (ping_history.empty()) {
1404 /// we haven't sent a ping yet or we have got all replies,
1405 /// in either way we are safe and healthy for now
1406 return false;
1407 }
7c673cae 1408
11fdf7f2
TL
1409 utime_t oldest_deadline = ping_history.begin()->second.first;
1410 return now > oldest_deadline;
7c673cae
FG
1411 }
1412
11fdf7f2
TL
1413 bool is_healthy(utime_t now) {
1414 if (last_rx_front == utime_t() || last_rx_back == utime_t()) {
1415 // only declare to be healthy until we have received the first
1416 // replies from both front/back connections
1417 return false;
1418 }
1419 return !is_unhealthy(now);
1420 }
9f95a23c
TL
1421
1422 void clear_mark_down(Connection *except = nullptr) {
1423 if (con_back && con_back != except) {
1424 con_back->mark_down();
1425 con_back->clear_priv();
1426 con_back.reset(nullptr);
1427 }
1428 if (con_front && con_front != except) {
1429 con_front->mark_down();
1430 con_front->clear_priv();
1431 con_front.reset(nullptr);
1432 }
1433 }
7c673cae 1434 };
9f95a23c
TL
1435
1436 ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock");
7c673cae 1437 map<int, int> debug_heartbeat_drops_remaining;
9f95a23c 1438 ceph::condition_variable heartbeat_cond;
7c673cae 1439 bool heartbeat_stop;
11fdf7f2 1440 std::atomic<bool> heartbeat_need_update;
7c673cae
FG
1441 map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
1442 utime_t last_mon_heartbeat;
1443 Messenger *hb_front_client_messenger;
1444 Messenger *hb_back_client_messenger;
1445 Messenger *hb_front_server_messenger;
1446 Messenger *hb_back_server_messenger;
1447 utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
1448 double daily_loadavg;
9f95a23c 1449 ceph::mono_time startup_time;
eafe8130
TL
1450
1451 // Track ping repsonse times using vector as a circular buffer
1452 // MUST BE A POWER OF 2
1453 const uint32_t hb_vector_size = 16;
1454
7c673cae
FG
1455 void _add_heartbeat_peer(int p);
1456 void _remove_heartbeat_peer(int p);
1457 bool heartbeat_reset(Connection *con);
1458 void maybe_update_heartbeat_peers();
494da23a 1459 void reset_heartbeat_peers(bool all);
7c673cae
FG
1460 bool heartbeat_peers_need_update() {
1461 return heartbeat_need_update.load();
1462 }
1463 void heartbeat_set_peers_need_update() {
1464 heartbeat_need_update.store(true);
1465 }
1466 void heartbeat_clear_peers_need_update() {
1467 heartbeat_need_update.store(false);
1468 }
1469 void heartbeat();
1470 void heartbeat_check();
1471 void heartbeat_entry();
1472 void need_heartbeat_peer_update();
1473
1474 void heartbeat_kick() {
11fdf7f2 1475 std::lock_guard l(heartbeat_lock);
9f95a23c 1476 heartbeat_cond.notify_all();
7c673cae
FG
1477 }
1478
1479 struct T_Heartbeat : public Thread {
1480 OSD *osd;
1481 explicit T_Heartbeat(OSD *o) : osd(o) {}
1482 void *entry() override {
1483 osd->heartbeat_entry();
1484 return 0;
1485 }
1486 } heartbeat_thread;
1487
1488public:
1489 bool heartbeat_dispatch(Message *m);
1490
1491 struct HeartbeatDispatcher : public Dispatcher {
1492 OSD *osd;
1493 explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
1494
1495 bool ms_can_fast_dispatch_any() const override { return true; }
1496 bool ms_can_fast_dispatch(const Message *m) const override {
1497 switch (m->get_type()) {
11fdf7f2
TL
1498 case CEPH_MSG_PING:
1499 case MSG_OSD_PING:
1500 return true;
1501 default:
1502 return false;
1503 }
7c673cae
FG
1504 }
1505 void ms_fast_dispatch(Message *m) override {
1506 osd->heartbeat_dispatch(m);
1507 }
1508 bool ms_dispatch(Message *m) override {
1509 return osd->heartbeat_dispatch(m);
1510 }
1511 bool ms_handle_reset(Connection *con) override {
1512 return osd->heartbeat_reset(con);
1513 }
1514 void ms_handle_remote_reset(Connection *con) override {}
1515 bool ms_handle_refused(Connection *con) override {
1516 return osd->ms_handle_refused(con);
1517 }
11fdf7f2 1518 int ms_handle_authentication(Connection *con) override {
7c673cae
FG
1519 return true;
1520 }
1521 } heartbeat_dispatcher;
1522
1523private:
1524 // -- waiters --
1525 list<OpRequestRef> finished;
1526
1527 void take_waiters(list<OpRequestRef>& ls) {
9f95a23c 1528 ceph_assert(ceph_mutex_is_locked(osd_lock));
7c673cae
FG
1529 finished.splice(finished.end(), ls);
1530 }
1531 void do_waiters();
1532
1533 // -- op tracking --
1534 OpTracker op_tracker;
7c673cae
FG
1535 void test_ops(std::string command, std::string args, ostream& ss);
1536 friend class TestOpsSocketHook;
1537 TestOpsSocketHook *test_ops_hook;
11fdf7f2 1538 friend struct C_FinishSplits;
7c673cae
FG
1539 friend struct C_OpenPGs;
1540
11fdf7f2 1541protected:
7c673cae
FG
1542
1543 /*
1544 * The ordered op delivery chain is:
1545 *
9f95a23c
TL
1546 * fast dispatch -> scheduler back
1547 * scheduler front <-> to_process back
7c673cae
FG
1548 * to_process front -> RunVis(item)
1549 * <- queue_front()
1550 *
9f95a23c
TL
1551 * The scheduler is per-shard, and to_process is per pg_slot. Items can be
1552 * pushed back up into to_process and/or scheduler while order is preserved.
7c673cae
FG
1553 *
1554 * Multiple worker threads can operate on each shard.
1555 *
11fdf7f2 1556 * Under normal circumstances, num_running == to_process.size(). There are
7c673cae
FG
1557 * two times when that is not true: (1) when waiting_for_pg == true and
1558 * to_process is accumulating requests that are waiting for the pg to be
1559 * instantiated; in that case they will all get requeued together by
1560 * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
1561 * and already requeued the items.
1562 */
9f95a23c
TL
1563 friend class ceph::osd::scheduler::PGOpItem;
1564 friend class ceph::osd::scheduler::PGPeeringItem;
1565 friend class ceph::osd::scheduler::PGRecovery;
1566 friend class ceph::osd::scheduler::PGDelete;
224ce89b 1567
7c673cae 1568 class ShardedOpWQ
9f95a23c 1569 : public ShardedThreadPool::ShardedWQ<OpSchedulerItem>
7c673cae 1570 {
7c673cae 1571 OSD *osd;
7c673cae
FG
1572
1573 public:
11fdf7f2 1574 ShardedOpWQ(OSD *o,
7c673cae
FG
1575 time_t ti,
1576 time_t si,
1577 ShardedThreadPool* tp)
9f95a23c 1578 : ShardedThreadPool::ShardedWQ<OpSchedulerItem>(ti, si, tp),
11fdf7f2 1579 osd(o) {
7c673cae
FG
1580 }
1581
11fdf7f2
TL
1582 void _add_slot_waiter(
1583 spg_t token,
1584 OSDShardPGSlot *slot,
9f95a23c 1585 OpSchedulerItem&& qi);
7c673cae
FG
1586
1587 /// try to do some work
1588 void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
1589
1590 /// enqueue a new item
9f95a23c 1591 void _enqueue(OpSchedulerItem&& item) override;
7c673cae
FG
1592
1593 /// requeue an old item (at the front of the line)
9f95a23c 1594 void _enqueue_front(OpSchedulerItem&& item) override;
7c673cae
FG
1595
1596 void return_waiting_threads() override {
11fdf7f2
TL
1597 for(uint32_t i = 0; i < osd->num_shards; i++) {
1598 OSDShard* sdata = osd->shards[i];
1599 assert (NULL != sdata);
1600 std::scoped_lock l{sdata->sdata_wait_lock};
1601 sdata->stop_waiting = true;
1602 sdata->sdata_cond.notify_all();
7c673cae
FG
1603 }
1604 }
1605
11fdf7f2
TL
1606 void stop_return_waiting_threads() override {
1607 for(uint32_t i = 0; i < osd->num_shards; i++) {
1608 OSDShard* sdata = osd->shards[i];
7c673cae 1609 assert (NULL != sdata);
11fdf7f2
TL
1610 std::scoped_lock l{sdata->sdata_wait_lock};
1611 sdata->stop_waiting = false;
1612 }
1613 }
1614
1615 void dump(Formatter *f) {
1616 for(uint32_t i = 0; i < osd->num_shards; i++) {
1617 auto &&sdata = osd->shards[i];
1618
1619 char queue_name[32] = {0};
1620 snprintf(queue_name, sizeof(queue_name), "%s%" PRIu32, "OSD:ShardedOpWQ:", i);
1621 ceph_assert(NULL != sdata);
1622
1623 std::scoped_lock l{sdata->shard_lock};
1624 f->open_object_section(queue_name);
9f95a23c 1625 sdata->scheduler->dump(*f);
7c673cae 1626 f->close_section();
7c673cae
FG
1627 }
1628 }
1629
11fdf7f2
TL
1630 bool is_shard_empty(uint32_t thread_index) override {
1631 uint32_t shard_index = thread_index % osd->num_shards;
1632 auto &&sdata = osd->shards[shard_index];
1633 ceph_assert(sdata);
1634 std::lock_guard l(sdata->shard_lock);
1635 if (thread_index < osd->num_shards) {
9f95a23c 1636 return sdata->scheduler->empty() && sdata->context_queue.empty();
11fdf7f2 1637 } else {
9f95a23c 1638 return sdata->scheduler->empty();
7c673cae 1639 }
11fdf7f2 1640 }
7c673cae 1641
11fdf7f2
TL
1642 void handle_oncommits(list<Context*>& oncommits) {
1643 for (auto p : oncommits) {
1644 p->complete(0);
1645 }
7c673cae
FG
1646 }
1647 } op_shardedwq;
1648
1649
11fdf7f2 1650 void enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch);
7c673cae
FG
1651 void dequeue_op(
1652 PGRef pg, OpRequestRef op,
1653 ThreadPool::TPHandle &handle);
1654
11fdf7f2
TL
1655 void enqueue_peering_evt(
1656 spg_t pgid,
1657 PGPeeringEventRef ref);
11fdf7f2
TL
1658 void dequeue_peering_evt(
1659 OSDShard *sdata,
1660 PG *pg,
1661 PGPeeringEventRef ref,
1662 ThreadPool::TPHandle& handle);
1663
1664 void dequeue_delete(
1665 OSDShard *sdata,
1666 PG *pg,
1667 epoch_t epoch,
1668 ThreadPool::TPHandle& handle);
7c673cae
FG
1669
1670 friend class PG;
11fdf7f2 1671 friend class OSDShard;
7c673cae
FG
1672 friend class PrimaryLogPG;
1673
1674
1675 protected:
1676
1677 // -- osd map --
9f95a23c
TL
1678 // TODO: switch to std::atomic<OSDMapRef> when C++20 will be available.
1679 OSDMapRef _osdmap;
1680 void set_osdmap(OSDMapRef osdmap) {
1681 std::atomic_store(&_osdmap, osdmap);
1682 }
1683 OSDMapRef get_osdmap() const {
1684 return std::atomic_load(&_osdmap);
7c673cae 1685 }
224ce89b 1686 epoch_t get_osdmap_epoch() const {
9f95a23c
TL
1687 // XXX: performance?
1688 auto osdmap = get_osdmap();
7c673cae
FG
1689 return osdmap ? osdmap->get_epoch() : 0;
1690 }
1691
11fdf7f2
TL
1692 pool_pg_num_history_t pg_num_history;
1693
9f95a23c 1694 ceph::shared_mutex map_lock = ceph::make_shared_mutex("OSD::map_lock");
7c673cae
FG
1695 list<OpRequestRef> waiting_for_osdmap;
1696 deque<utime_t> osd_markdown_log;
1697
1698 friend struct send_map_on_destruct;
1699
1700 void wait_for_new_map(OpRequestRef op);
1701 void handle_osd_map(class MOSDMap *m);
1702 void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
1703 void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
1704 void note_down_osd(int osd);
1705 void note_up_osd(int osd);
1706 friend class C_OnMapCommit;
1707
1708 bool advance_pg(
11fdf7f2
TL
1709 epoch_t advance_to,
1710 PG *pg,
7c673cae 1711 ThreadPool::TPHandle &handle,
9f95a23c 1712 PeeringCtx &rctx);
7c673cae
FG
1713 void consume_map();
1714 void activate_map();
1715
1716 // osd map cache (past osd maps)
1717 OSDMapRef get_map(epoch_t e) {
1718 return service.get_map(e);
1719 }
1720 OSDMapRef add_map(OSDMap *o) {
1721 return service.add_map(o);
1722 }
7c673cae
FG
1723 bool get_map_bl(epoch_t e, bufferlist& bl) {
1724 return service.get_map_bl(e, bl);
1725 }
11fdf7f2
TL
1726
1727public:
1728 // -- shards --
1729 vector<OSDShard*> shards;
1730 uint32_t num_shards = 0;
1731
1732 void inc_num_pgs() {
1733 ++num_pgs;
1734 }
1735 void dec_num_pgs() {
1736 --num_pgs;
1737 }
1738 int get_num_pgs() const {
1739 return num_pgs;
7c673cae
FG
1740 }
1741
1742protected:
9f95a23c 1743 ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
11fdf7f2
TL
1744 /// merge epoch -> target pgid -> source pgid -> pg
1745 map<epoch_t,map<spg_t,map<spg_t,PGRef>>> merge_waiters;
1746
1747 bool add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef source,
1748 unsigned need);
1749
7c673cae 1750 // -- placement groups --
11fdf7f2 1751 std::atomic<size_t> num_pgs = {0};
7c673cae 1752
3efd9988 1753 std::mutex pending_creates_lock;
9f95a23c 1754 using create_from_osd_t = std::pair<spg_t, bool /* is primary*/>;
b32b8144 1755 std::set<create_from_osd_t> pending_creates_from_osd;
3efd9988
FG
1756 unsigned pending_creates_from_mon = 0;
1757
7c673cae
FG
1758 PGRecoveryStats pg_recovery_stats;
1759
11fdf7f2
TL
1760 PGRef _lookup_pg(spg_t pgid);
1761 PGRef _lookup_lock_pg(spg_t pgid);
1762 void register_pg(PGRef pg);
1763 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
7c673cae 1764
11fdf7f2
TL
1765 void _get_pgs(vector<PGRef> *v, bool clear_too=false);
1766 void _get_pgids(vector<spg_t> *v);
31f18b77
FG
1767
1768public:
11fdf7f2 1769 PGRef lookup_lock_pg(spg_t pgid);
31f18b77 1770
11fdf7f2 1771 std::set<int64_t> get_mapped_pools();
35e4c445 1772
31f18b77 1773protected:
7c673cae 1774 PG* _make_pg(OSDMapRef createmap, spg_t pgid);
7c673cae 1775
11fdf7f2
TL
1776 bool maybe_wait_for_max_pg(const OSDMapRef& osdmap,
1777 spg_t pgid, bool is_mon_create);
3efd9988
FG
1778 void resume_creating_pg();
1779
7c673cae 1780 void load_pgs();
7c673cae
FG
1781
1782 /// build initial pg history and intervals on create
1783 void build_initial_pg_history(
1784 spg_t pgid,
1785 epoch_t created,
1786 utime_t created_stamp,
1787 pg_history_t *h,
1788 PastIntervals *pi);
1789
7c673cae
FG
1790 epoch_t last_pg_create_epoch;
1791
1792 void handle_pg_create(OpRequestRef op);
1793
1794 void split_pgs(
1795 PG *parent,
31f18b77 1796 const set<spg_t> &childpgids, set<PGRef> *out_pgs,
7c673cae
FG
1797 OSDMapRef curmap,
1798 OSDMapRef nextmap,
9f95a23c 1799 PeeringCtx &rctx);
11fdf7f2 1800 void _finish_splits(set<PGRef>& pgs);
7c673cae
FG
1801
1802 // == monitor interaction ==
9f95a23c 1803 ceph::mutex mon_report_lock = ceph::make_mutex("OSD::mon_report_lock");
7c673cae 1804 utime_t last_mon_report;
11fdf7f2 1805 Finisher boot_finisher;
7c673cae
FG
1806
1807 // -- boot --
1808 void start_boot();
1809 void _got_mon_epochs(epoch_t oldest, epoch_t newest);
1810 void _preboot(epoch_t oldest, epoch_t newest);
1811 void _send_boot();
1812 void _collect_metadata(map<string,string> *pmeta);
9f95a23c
TL
1813 void _get_purged_snaps();
1814 void handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *r);
7c673cae
FG
1815
1816 void start_waiting_for_healthy();
1817 bool _is_healthy();
1818
1819 void send_full_update();
1820
1821 friend struct C_OSD_GetVersion;
1822
1823 // -- alive --
1824 epoch_t up_thru_wanted;
1825
1826 void queue_want_up_thru(epoch_t want);
1827 void send_alive();
1828
1829 // -- full map requests --
1830 epoch_t requested_full_first, requested_full_last;
1831
1832 void request_full_map(epoch_t first, epoch_t last);
1833 void rerequest_full_maps() {
1834 epoch_t first = requested_full_first;
1835 epoch_t last = requested_full_last;
1836 requested_full_first = 0;
1837 requested_full_last = 0;
1838 request_full_map(first, last);
1839 }
1840 void got_full_map(epoch_t e);
1841
1842 // -- failures --
1843 map<int,utime_t> failure_queue;
11fdf7f2 1844 map<int,pair<utime_t,entity_addrvec_t> > failure_pending;
7c673cae
FG
1845
1846 void requeue_failures();
1847 void send_failures();
11fdf7f2
TL
1848 void send_still_alive(epoch_t epoch, int osd, const entity_addrvec_t &addrs);
1849 void cancel_pending_failures();
7c673cae
FG
1850
1851 ceph::coarse_mono_clock::time_point last_sent_beacon;
9f95a23c 1852 ceph::mutex min_last_epoch_clean_lock = ceph::make_mutex("OSD::min_last_epoch_clean_lock");
7c673cae
FG
1853 epoch_t min_last_epoch_clean = 0;
1854 // which pgs were scanned for min_lec
1855 std::vector<pg_t> min_last_epoch_clean_pgs;
1856 void send_beacon(const ceph::coarse_mono_clock::time_point& now);
1857
7c673cae
FG
1858 ceph_tid_t get_tid() {
1859 return service.get_tid();
1860 }
1861
9f95a23c
TL
1862 double scrub_sleep_time(bool must_scrub);
1863
7c673cae 1864 // -- generic pg peering --
9f95a23c
TL
1865 PeeringCtx create_context();
1866 void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
7c673cae 1867 ThreadPool::TPHandle *handle = NULL);
7c673cae
FG
1868
1869 bool require_mon_peer(const Message *m);
1870 bool require_mon_or_mgr_peer(const Message *m);
1871 bool require_osd_peer(const Message *m);
1872 /***
1873 * Verifies that we were alive in the given epoch, and that
1874 * still are.
1875 */
1876 bool require_self_aliveness(const Message *m, epoch_t alive_since);
1877 /**
1878 * Verifies that the OSD who sent the given op has the same
1879 * address as in the given map.
1880 * @pre op was sent by an OSD using the cluster messenger
1881 */
9f95a23c 1882 bool require_same_peer_instance(const Message *m, const OSDMapRef& map,
7c673cae
FG
1883 bool is_fast_dispatch);
1884
1885 bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
1886 bool is_fast_dispatch);
1887
11fdf7f2
TL
1888 void handle_fast_pg_create(MOSDPGCreate2 *m);
1889 void handle_fast_pg_query(MOSDPGQuery *m);
1890 void handle_pg_query_nopg(const MQuery& q);
1891 void handle_fast_pg_notify(MOSDPGNotify *m);
1892 void handle_pg_notify_nopg(const MNotifyRec& q);
1893 void handle_fast_pg_info(MOSDPGInfo *m);
1894 void handle_fast_pg_remove(MOSDPGRemove *m);
7c673cae 1895
11fdf7f2
TL
1896public:
1897 // used by OSDShard
1898 PGRef handle_pg_create_info(const OSDMapRef& osdmap, const PGCreateInfo *info);
1899protected:
c07f9fc5 1900
11fdf7f2 1901 void handle_fast_force_recovery(MOSDForceRecovery *m);
7c673cae
FG
1902
1903 // -- commands --
7c673cae 1904 void handle_command(class MCommand *m);
11fdf7f2 1905
7c673cae
FG
1906
1907 // -- pg recovery --
1908 void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
1909 ThreadPool::TPHandle &handle);
1910
1911
1912 // -- scrubbing --
1913 void sched_scrub();
494da23a 1914 void resched_all_scrubs();
7c673cae
FG
1915 bool scrub_random_backoff();
1916 bool scrub_load_below_threshold();
1917 bool scrub_time_permit(utime_t now);
1918
b32b8144
FG
1919 // -- status reporting --
1920 MPGStats *collect_pg_stats();
11fdf7f2
TL
1921 std::vector<DaemonHealthMetric> get_health_metrics();
1922
b32b8144 1923
224ce89b 1924private:
7c673cae
FG
1925 bool ms_can_fast_dispatch_any() const override { return true; }
1926 bool ms_can_fast_dispatch(const Message *m) const override {
1927 switch (m->get_type()) {
11fdf7f2 1928 case CEPH_MSG_PING:
7c673cae
FG
1929 case CEPH_MSG_OSD_OP:
1930 case CEPH_MSG_OSD_BACKOFF:
11fdf7f2
TL
1931 case MSG_OSD_SCRUB2:
1932 case MSG_OSD_FORCE_RECOVERY:
1933 case MSG_MON_COMMAND:
1934 case MSG_OSD_PG_CREATE2:
1935 case MSG_OSD_PG_QUERY:
9f95a23c 1936 case MSG_OSD_PG_QUERY2:
11fdf7f2 1937 case MSG_OSD_PG_INFO:
9f95a23c 1938 case MSG_OSD_PG_INFO2:
11fdf7f2 1939 case MSG_OSD_PG_NOTIFY:
9f95a23c 1940 case MSG_OSD_PG_NOTIFY2:
11fdf7f2
TL
1941 case MSG_OSD_PG_LOG:
1942 case MSG_OSD_PG_TRIM:
1943 case MSG_OSD_PG_REMOVE:
1944 case MSG_OSD_BACKFILL_RESERVE:
1945 case MSG_OSD_RECOVERY_RESERVE:
7c673cae 1946 case MSG_OSD_REPOP:
7c673cae
FG
1947 case MSG_OSD_REPOPREPLY:
1948 case MSG_OSD_PG_PUSH:
1949 case MSG_OSD_PG_PULL:
1950 case MSG_OSD_PG_PUSH_REPLY:
1951 case MSG_OSD_PG_SCAN:
1952 case MSG_OSD_PG_BACKFILL:
1953 case MSG_OSD_PG_BACKFILL_REMOVE:
1954 case MSG_OSD_EC_WRITE:
1955 case MSG_OSD_EC_WRITE_REPLY:
1956 case MSG_OSD_EC_READ:
1957 case MSG_OSD_EC_READ_REPLY:
1958 case MSG_OSD_SCRUB_RESERVE:
1959 case MSG_OSD_REP_SCRUB:
1960 case MSG_OSD_REP_SCRUBMAP:
1961 case MSG_OSD_PG_UPDATE_LOG_MISSING:
1962 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
c07f9fc5
FG
1963 case MSG_OSD_PG_RECOVERY_DELETE:
1964 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
9f95a23c
TL
1965 case MSG_OSD_PG_LEASE:
1966 case MSG_OSD_PG_LEASE_ACK:
7c673cae
FG
1967 return true;
1968 default:
1969 return false;
1970 }
1971 }
1972 void ms_fast_dispatch(Message *m) override;
7c673cae 1973 bool ms_dispatch(Message *m) override;
7c673cae
FG
1974 void ms_handle_connect(Connection *con) override;
1975 void ms_handle_fast_connect(Connection *con) override;
1976 void ms_handle_fast_accept(Connection *con) override;
11fdf7f2 1977 int ms_handle_authentication(Connection *con) override;
7c673cae
FG
1978 bool ms_handle_reset(Connection *con) override;
1979 void ms_handle_remote_reset(Connection *con) override {}
1980 bool ms_handle_refused(Connection *con) override;
1981
7c673cae
FG
1982 public:
1983 /* internal and external can point to the same messenger, they will still
1984 * be cleaned up properly*/
1985 OSD(CephContext *cct_,
1986 ObjectStore *store_,
1987 int id,
1988 Messenger *internal,
1989 Messenger *external,
1990 Messenger *hb_front_client,
1991 Messenger *hb_back_client,
1992 Messenger *hb_front_server,
1993 Messenger *hb_back_server,
1994 Messenger *osdc_messenger,
1995 MonClient *mc, const std::string &dev, const std::string &jdev);
1996 ~OSD() override;
1997
1998 // static bits
e306af50 1999 static int mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, std::string osdspec_affinity);
11fdf7f2 2000
7c673cae
FG
2001 /* remove any non-user xattrs from a map of them */
2002 void filter_xattrs(map<string, bufferptr>& attrs) {
2003 for (map<string, bufferptr>::iterator iter = attrs.begin();
2004 iter != attrs.end();
2005 ) {
2006 if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
2007 attrs.erase(iter++);
2008 else ++iter;
2009 }
2010 }
2011
2012private:
2013 int mon_cmd_maybe_osd_create(string &cmd);
2014 int update_crush_device_class();
2015 int update_crush_location();
2016
3efd9988
FG
2017 static int write_meta(CephContext *cct,
2018 ObjectStore *store,
e306af50 2019 uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami, std::string& osdspec_affinity);
7c673cae 2020
7c673cae 2021 void handle_scrub(struct MOSDScrub *m);
11fdf7f2 2022 void handle_fast_scrub(struct MOSDScrub2 *m);
7c673cae
FG
2023 void handle_osd_ping(class MOSDPing *m);
2024
9f95a23c 2025 size_t get_num_cache_shards();
31f18b77
FG
2026 int get_num_op_shards();
2027 int get_num_op_threads();
2028
c07f9fc5 2029 float get_osd_recovery_sleep();
11fdf7f2 2030 float get_osd_delete_sleep();
494da23a 2031 float get_osd_snap_trim_sleep();
11fdf7f2 2032
9f95a23c
TL
2033 int get_recovery_max_active();
2034
2035 void scrub_purged_snaps();
11fdf7f2 2036 void probe_smart(const string& devid, ostream& ss);
c07f9fc5 2037
7c673cae 2038public:
11fdf7f2
TL
2039 static int peek_meta(ObjectStore *store,
2040 string *magic,
2041 uuid_d *cluster_fsid,
2042 uuid_d *osd_fsid,
2043 int *whoami,
9f95a23c 2044 ceph_release_t *min_osd_release);
7c673cae
FG
2045
2046
2047 // startup/shutdown
2048 int pre_init();
2049 int init();
2050 void final_init();
2051
2052 int enable_disable_fuse(bool stop);
11fdf7f2 2053 int set_numa_affinity();
7c673cae
FG
2054
2055 void suicide(int exitcode);
2056 int shutdown();
2057
2058 void handle_signal(int signum);
2059
2060 /// check if we can throw out op from a disconnected client
2061 static bool op_is_discardable(const MOSDOp *m);
2062
2063public:
2064 OSDService service;
2065 friend class OSDService;
11fdf7f2
TL
2066
2067private:
9f95a23c
TL
2068 void set_perf_queries(const ConfigPayload &config_payload);
2069 MetricPayload get_perf_reports();
11fdf7f2 2070
9f95a23c 2071 ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock");
11fdf7f2
TL
2072 std::list<OSDPerfMetricQuery> m_perf_queries;
2073 std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> m_perf_limits;
7c673cae
FG
2074};
2075
224ce89b 2076
7c673cae
FG
2077//compatibility of the executable
2078extern const CompatSet::Feature ceph_osd_feature_compat[];
2079extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
2080extern const CompatSet::Feature ceph_osd_feature_incompat[];
2081
224ce89b 2082#endif // CEPH_OSD_H