]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/OSD.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / osd / OSD.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_OSD_H
16#define CEPH_OSD_H
17
18#include "PG.h"
19
20#include "msg/Dispatcher.h"
21
22#include "common/Mutex.h"
23#include "common/RWLock.h"
24#include "common/Timer.h"
25#include "common/WorkQueue.h"
26#include "common/AsyncReserver.h"
27#include "common/ceph_context.h"
11fdf7f2 28#include "common/config_cacher.h"
7c673cae
FG
29#include "common/zipkin_trace.h"
30
31#include "mgr/MgrClient.h"
32
33#include "os/ObjectStore.h"
34#include "OSDCap.h"
35
36#include "auth/KeyRing.h"
11fdf7f2 37
7c673cae
FG
38#include "osd/ClassHandler.h"
39
40#include "include/CompatSet.h"
41
42#include "OpRequest.h"
43#include "Session.h"
44
11fdf7f2 45#include "osd/OpQueueItem.h"
224ce89b 46
7c673cae
FG
47#include <atomic>
48#include <map>
49#include <memory>
11fdf7f2 50#include <string>
7c673cae
FG
51
52#include "include/unordered_map.h"
53
54#include "common/shared_cache.hpp"
55#include "common/simple_cache.hpp"
56#include "common/sharedptr_registry.hpp"
57#include "common/WeightedPriorityQueue.h"
58#include "common/PrioritizedQueue.h"
224ce89b
WB
59#include "osd/mClockOpClassQueue.h"
60#include "osd/mClockClientQueue.h"
7c673cae 61#include "messages/MOSDOp.h"
7c673cae
FG
62#include "common/EventTrace.h"
63
64#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
65
11fdf7f2
TL
66/*
67
68 lock ordering for pg map
69
70 PG::lock
71 ShardData::lock
72 OSD::pg_map_lock
73
74 */
7c673cae
FG
75
76enum {
77 l_osd_first = 10000,
78 l_osd_op_wip,
79 l_osd_op,
80 l_osd_op_inb,
81 l_osd_op_outb,
82 l_osd_op_lat,
83 l_osd_op_process_lat,
84 l_osd_op_prepare_lat,
85 l_osd_op_r,
86 l_osd_op_r_outb,
87 l_osd_op_r_lat,
88 l_osd_op_r_lat_outb_hist,
89 l_osd_op_r_process_lat,
90 l_osd_op_r_prepare_lat,
91 l_osd_op_w,
92 l_osd_op_w_inb,
93 l_osd_op_w_lat,
94 l_osd_op_w_lat_inb_hist,
95 l_osd_op_w_process_lat,
96 l_osd_op_w_prepare_lat,
97 l_osd_op_rw,
98 l_osd_op_rw_inb,
99 l_osd_op_rw_outb,
100 l_osd_op_rw_lat,
101 l_osd_op_rw_lat_inb_hist,
102 l_osd_op_rw_lat_outb_hist,
103 l_osd_op_rw_process_lat,
104 l_osd_op_rw_prepare_lat,
105
224ce89b
WB
106 l_osd_op_before_queue_op_lat,
107 l_osd_op_before_dequeue_op_lat,
108
7c673cae
FG
109 l_osd_sop,
110 l_osd_sop_inb,
111 l_osd_sop_lat,
112 l_osd_sop_w,
113 l_osd_sop_w_inb,
114 l_osd_sop_w_lat,
115 l_osd_sop_pull,
116 l_osd_sop_pull_lat,
117 l_osd_sop_push,
118 l_osd_sop_push_inb,
119 l_osd_sop_push_lat,
120
121 l_osd_pull,
122 l_osd_push,
123 l_osd_push_outb,
124
125 l_osd_rop,
11fdf7f2 126 l_osd_rbytes,
7c673cae
FG
127
128 l_osd_loadavg,
7c673cae
FG
129 l_osd_cached_crc,
130 l_osd_cached_crc_adjusted,
131 l_osd_missed_crc,
132
133 l_osd_pg,
134 l_osd_pg_primary,
135 l_osd_pg_replica,
136 l_osd_pg_stray,
94b18763 137 l_osd_pg_removing,
7c673cae
FG
138 l_osd_hb_to,
139 l_osd_map,
140 l_osd_mape,
141 l_osd_mape_dup,
142
143 l_osd_waiting_for_map,
144
145 l_osd_map_cache_hit,
146 l_osd_map_cache_miss,
147 l_osd_map_cache_miss_low,
148 l_osd_map_cache_miss_low_avg,
31f18b77
FG
149 l_osd_map_bl_cache_hit,
150 l_osd_map_bl_cache_miss,
7c673cae
FG
151
152 l_osd_stat_bytes,
153 l_osd_stat_bytes_used,
154 l_osd_stat_bytes_avail,
155
156 l_osd_copyfrom,
157
158 l_osd_tier_promote,
159 l_osd_tier_flush,
160 l_osd_tier_flush_fail,
161 l_osd_tier_try_flush,
162 l_osd_tier_try_flush_fail,
163 l_osd_tier_evict,
164 l_osd_tier_whiteout,
165 l_osd_tier_dirty,
166 l_osd_tier_clean,
167 l_osd_tier_delay,
168 l_osd_tier_proxy_read,
169 l_osd_tier_proxy_write,
170
171 l_osd_agent_wake,
172 l_osd_agent_skip,
173 l_osd_agent_flush,
174 l_osd_agent_evict,
175
176 l_osd_object_ctx_cache_hit,
177 l_osd_object_ctx_cache_total,
178
179 l_osd_op_cache_hit,
180 l_osd_tier_flush_lat,
181 l_osd_tier_promote_lat,
182 l_osd_tier_r_lat,
183
184 l_osd_pg_info,
185 l_osd_pg_fastinfo,
186 l_osd_pg_biginfo,
187
188 l_osd_last,
189};
190
191// RecoveryState perf counters
192enum {
193 rs_first = 20000,
194 rs_initial_latency,
195 rs_started_latency,
196 rs_reset_latency,
197 rs_start_latency,
198 rs_primary_latency,
199 rs_peering_latency,
200 rs_backfilling_latency,
201 rs_waitremotebackfillreserved_latency,
202 rs_waitlocalbackfillreserved_latency,
203 rs_notbackfilling_latency,
204 rs_repnotrecovering_latency,
205 rs_repwaitrecoveryreserved_latency,
206 rs_repwaitbackfillreserved_latency,
207 rs_reprecovering_latency,
208 rs_activating_latency,
209 rs_waitlocalrecoveryreserved_latency,
210 rs_waitremoterecoveryreserved_latency,
211 rs_recovering_latency,
212 rs_recovered_latency,
213 rs_clean_latency,
214 rs_active_latency,
215 rs_replicaactive_latency,
216 rs_stray_latency,
217 rs_getinfo_latency,
218 rs_getlog_latency,
219 rs_waitactingchange_latency,
220 rs_incomplete_latency,
221 rs_down_latency,
222 rs_getmissing_latency,
223 rs_waitupthru_latency,
224 rs_notrecovering_latency,
225 rs_last,
226};
227
228class Messenger;
229class Message;
230class MonClient;
231class PerfCounters;
232class ObjectStore;
233class FuseStore;
234class OSDMap;
235class MLog;
236class Objecter;
11fdf7f2 237class KeyStore;
7c673cae
FG
238
239class Watch;
240class PrimaryLogPG;
241
7c673cae 242class TestOpsSocketHook;
11fdf7f2 243struct C_FinishSplits;
7c673cae
FG
244struct C_OpenPGs;
245class LogChannel;
246class CephContext;
7c673cae
FG
247class MOSDOp;
248
11fdf7f2
TL
249class MOSDPGCreate2;
250class MOSDPGQuery;
251class MOSDPGNotify;
252class MOSDPGInfo;
253class MOSDPGRemove;
254class MOSDForceRecovery;
7c673cae
FG
255
256class OSD;
257
7c673cae
FG
258class OSDService {
259public:
260 OSD *osd;
261 CephContext *cct;
11fdf7f2 262 ObjectStore::CollectionHandle meta_ch;
7c673cae
FG
263 const int whoami;
264 ObjectStore *&store;
265 LogClient &log_client;
266 LogChannelRef clog;
267 PGRecoveryStats &pg_recovery_stats;
268private:
269 Messenger *&cluster_messenger;
270 Messenger *&client_messenger;
271public:
272 PerfCounters *&logger;
273 PerfCounters *&recoverystate_perf;
274 MonClient *&monc;
7c673cae
FG
275 ClassHandler *&class_handler;
276
11fdf7f2
TL
277 md_config_cacher_t<Option::size_t> osd_max_object_size;
278 md_config_cacher_t<bool> osd_skip_data_digest;
279
280 void enqueue_back(OpQueueItem&& qi);
281 void enqueue_front(OpQueueItem&& qi);
7c673cae
FG
282
283 void maybe_inject_dispatch_delay() {
11fdf7f2 284 if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) {
7c673cae 285 if (rand() % 10000 <
11fdf7f2 286 g_conf()->osd_debug_inject_dispatch_delay_probability * 10000) {
7c673cae 287 utime_t t;
11fdf7f2 288 t.set_from_double(g_conf()->osd_debug_inject_dispatch_delay_duration);
7c673cae
FG
289 t.sleep();
290 }
291 }
292 }
293
7c673cae
FG
294private:
295 // -- superblock --
11fdf7f2 296 ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
7c673cae
FG
297 OSDSuperblock superblock;
298
299public:
300 OSDSuperblock get_superblock() {
11fdf7f2 301 std::lock_guard l(publish_lock);
7c673cae
FG
302 return superblock;
303 }
304 void publish_superblock(const OSDSuperblock &block) {
11fdf7f2 305 std::lock_guard l(publish_lock);
7c673cae
FG
306 superblock = block;
307 }
308
309 int get_nodeid() const { return whoami; }
310
311 std::atomic<epoch_t> max_oldest_map;
312private:
313 OSDMapRef osdmap;
314
315public:
316 OSDMapRef get_osdmap() {
11fdf7f2 317 std::lock_guard l(publish_lock);
7c673cae
FG
318 return osdmap;
319 }
320 epoch_t get_osdmap_epoch() {
11fdf7f2 321 std::lock_guard l(publish_lock);
7c673cae
FG
322 return osdmap ? osdmap->get_epoch() : 0;
323 }
324 void publish_map(OSDMapRef map) {
11fdf7f2 325 std::lock_guard l(publish_lock);
7c673cae
FG
326 osdmap = map;
327 }
328
329 /*
330 * osdmap - current published map
331 * next_osdmap - pre_published map that is about to be published.
332 *
333 * We use the next_osdmap to send messages and initiate connections,
334 * but only if the target is the same instance as the one in the map
335 * epoch the current user is working from (i.e., the result is
336 * equivalent to what is in next_osdmap).
337 *
338 * This allows the helpers to start ignoring osds that are about to
339 * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
340 * down, without worrying about reopening connections from threads
341 * working from old maps.
342 */
343private:
344 OSDMapRef next_osdmap;
11fdf7f2 345 ceph::condition_variable pre_publish_cond;
7c673cae
FG
346
347public:
348 void pre_publish_map(OSDMapRef map) {
11fdf7f2 349 std::lock_guard l(pre_publish_lock);
7c673cae
FG
350 next_osdmap = std::move(map);
351 }
352
353 void activate_map();
354 /// map epochs reserved below
355 map<epoch_t, unsigned> map_reservations;
356
357 /// gets ref to next_osdmap and registers the epoch as reserved
358 OSDMapRef get_nextmap_reserved() {
11fdf7f2 359 std::lock_guard l(pre_publish_lock);
7c673cae
FG
360 if (!next_osdmap)
361 return OSDMapRef();
362 epoch_t e = next_osdmap->get_epoch();
363 map<epoch_t, unsigned>::iterator i =
364 map_reservations.insert(make_pair(e, 0)).first;
365 i->second++;
366 return next_osdmap;
367 }
368 /// releases reservation on map
369 void release_map(OSDMapRef osdmap) {
11fdf7f2 370 std::lock_guard l(pre_publish_lock);
7c673cae
FG
371 map<epoch_t, unsigned>::iterator i =
372 map_reservations.find(osdmap->get_epoch());
11fdf7f2
TL
373 ceph_assert(i != map_reservations.end());
374 ceph_assert(i->second > 0);
7c673cae
FG
375 if (--(i->second) == 0) {
376 map_reservations.erase(i);
377 }
11fdf7f2 378 pre_publish_cond.notify_all();
7c673cae
FG
379 }
380 /// blocks until there are no reserved maps prior to next_osdmap
381 void await_reserved_maps() {
11fdf7f2
TL
382 std::unique_lock l{pre_publish_lock};
383 ceph_assert(next_osdmap);
384 pre_publish_cond.wait(l, [this] {
385 auto i = map_reservations.cbegin();
386 return (i == map_reservations.cend() ||
387 i->first >= next_osdmap->get_epoch());
388 });
389 }
390 OSDMapRef get_next_osdmap() {
391 std::lock_guard l(pre_publish_lock);
392 if (!next_osdmap)
393 return OSDMapRef();
394 return next_osdmap;
7c673cae
FG
395 }
396
397private:
398 Mutex peer_map_epoch_lock;
399 map<int, epoch_t> peer_map_epoch;
400public:
401 epoch_t get_peer_epoch(int p);
402 epoch_t note_peer_epoch(int p, epoch_t e);
403 void forget_peer_epoch(int p, epoch_t e);
404
405 void send_map(class MOSDMap *m, Connection *con);
406 void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
407 MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
408 OSDSuperblock& superblock);
409 bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
410 const OSDMapRef& osdmap, const epoch_t *sent_epoch_p);
411 void share_map(entity_name_t name, Connection *con, epoch_t epoch,
412 OSDMapRef& osdmap, epoch_t *sent_epoch_p);
413 void share_map_peer(int peer, Connection *con,
414 OSDMapRef map = OSDMapRef());
415
416 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
417 pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
418 void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
419 void send_message_osd_cluster(Message *m, Connection *con) {
420 con->send_message(m);
421 }
422 void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
423 con->send_message(m);
424 }
425 void send_message_osd_client(Message *m, Connection *con) {
426 con->send_message(m);
427 }
428 void send_message_osd_client(Message *m, const ConnectionRef& con) {
429 con->send_message(m);
430 }
11fdf7f2 431 entity_name_t get_cluster_msgr_name() const;
7c673cae
FG
432
433private:
434 // -- scrub scheduling --
435 Mutex sched_scrub_lock;
436 int scrubs_pending;
437 int scrubs_active;
438
439public:
440 struct ScrubJob {
441 CephContext* cct;
442 /// pg to be scrubbed
443 spg_t pgid;
444 /// a time scheduled for scrub. but the scrub could be delayed if system
445 /// load is too high or it fails to fall in the scrub hours
446 utime_t sched_time;
447 /// the hard upper bound of scrub time
448 utime_t deadline;
449 ScrubJob() : cct(nullptr) {}
450 explicit ScrubJob(CephContext* cct, const spg_t& pg,
451 const utime_t& timestamp,
452 double pool_scrub_min_interval = 0,
453 double pool_scrub_max_interval = 0, bool must = true);
454 /// order the jobs by sched_time
455 bool operator<(const ScrubJob& rhs) const;
456 };
457 set<ScrubJob> sched_scrub_pg;
458
459 /// @returns the scrub_reg_stamp used for unregister the scrub job
460 utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval,
461 double pool_scrub_max_interval, bool must) {
462 ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
463 must);
11fdf7f2 464 std::lock_guard l(sched_scrub_lock);
7c673cae
FG
465 sched_scrub_pg.insert(scrub);
466 return scrub.sched_time;
467 }
468 void unreg_pg_scrub(spg_t pgid, utime_t t) {
11fdf7f2 469 std::lock_guard l(sched_scrub_lock);
7c673cae 470 size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
11fdf7f2 471 ceph_assert(removed);
7c673cae
FG
472 }
473 bool first_scrub_stamp(ScrubJob *out) {
11fdf7f2 474 std::lock_guard l(sched_scrub_lock);
7c673cae
FG
475 if (sched_scrub_pg.empty())
476 return false;
477 set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
478 *out = *iter;
479 return true;
480 }
481 bool next_scrub_stamp(const ScrubJob& next,
482 ScrubJob *out) {
11fdf7f2 483 std::lock_guard l(sched_scrub_lock);
7c673cae
FG
484 if (sched_scrub_pg.empty())
485 return false;
486 set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next);
487 if (iter == sched_scrub_pg.cend())
488 return false;
489 ++iter;
490 if (iter == sched_scrub_pg.cend())
491 return false;
492 *out = *iter;
493 return true;
494 }
495
496 void dumps_scrub(Formatter *f) {
11fdf7f2
TL
497 ceph_assert(f != nullptr);
498 std::lock_guard l(sched_scrub_lock);
7c673cae
FG
499
500 f->open_array_section("scrubs");
501 for (const auto &i: sched_scrub_pg) {
502 f->open_object_section("scrub");
503 f->dump_stream("pgid") << i.pgid;
504 f->dump_stream("sched_time") << i.sched_time;
505 f->dump_stream("deadline") << i.deadline;
506 f->dump_bool("forced", i.sched_time == i.deadline);
507 f->close_section();
508 }
509 f->close_section();
510 }
511
512 bool can_inc_scrubs_pending();
513 bool inc_scrubs_pending();
514 void inc_scrubs_active(bool reserved);
515 void dec_scrubs_pending();
516 void dec_scrubs_active();
517
518 void reply_op_error(OpRequestRef op, int err);
519 void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
520 void handle_misdirected_op(PG *pg, OpRequestRef op);
521
522
523private:
524 // -- agent shared state --
525 Mutex agent_lock;
526 Cond agent_cond;
527 map<uint64_t, set<PGRef> > agent_queue;
528 set<PGRef>::iterator agent_queue_pos;
529 bool agent_valid_iterator;
530 int agent_ops;
531 int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
532 set<hobject_t> agent_oids;
533 bool agent_active;
534 struct AgentThread : public Thread {
535 OSDService *osd;
536 explicit AgentThread(OSDService *o) : osd(o) {}
537 void *entry() override {
538 osd->agent_entry();
539 return NULL;
540 }
541 } agent_thread;
542 bool agent_stop_flag;
543 Mutex agent_timer_lock;
544 SafeTimer agent_timer;
545
546public:
547 void agent_entry();
548 void agent_stop();
549
550 void _enqueue(PG *pg, uint64_t priority) {
551 if (!agent_queue.empty() &&
552 agent_queue.rbegin()->first < priority)
553 agent_valid_iterator = false; // inserting higher-priority queue
554 set<PGRef>& nq = agent_queue[priority];
555 if (nq.empty())
556 agent_cond.Signal();
557 nq.insert(pg);
558 }
559
560 void _dequeue(PG *pg, uint64_t old_priority) {
561 set<PGRef>& oq = agent_queue[old_priority];
562 set<PGRef>::iterator p = oq.find(pg);
11fdf7f2 563 ceph_assert(p != oq.end());
7c673cae
FG
564 if (p == agent_queue_pos)
565 ++agent_queue_pos;
566 oq.erase(p);
567 if (oq.empty()) {
568 if (agent_queue.rbegin()->first == old_priority)
569 agent_valid_iterator = false;
570 agent_queue.erase(old_priority);
571 }
572 }
573
574 /// enable agent for a pg
575 void agent_enable_pg(PG *pg, uint64_t priority) {
11fdf7f2 576 std::lock_guard l(agent_lock);
7c673cae
FG
577 _enqueue(pg, priority);
578 }
579
580 /// adjust priority for an enagled pg
581 void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
11fdf7f2
TL
582 std::lock_guard l(agent_lock);
583 ceph_assert(new_priority != old_priority);
7c673cae
FG
584 _enqueue(pg, new_priority);
585 _dequeue(pg, old_priority);
586 }
587
588 /// disable agent for a pg
589 void agent_disable_pg(PG *pg, uint64_t old_priority) {
11fdf7f2 590 std::lock_guard l(agent_lock);
7c673cae
FG
591 _dequeue(pg, old_priority);
592 }
593
594 /// note start of an async (evict) op
595 void agent_start_evict_op() {
11fdf7f2 596 std::lock_guard l(agent_lock);
7c673cae
FG
597 ++agent_ops;
598 }
599
600 /// note finish or cancellation of an async (evict) op
601 void agent_finish_evict_op() {
11fdf7f2
TL
602 std::lock_guard l(agent_lock);
603 ceph_assert(agent_ops > 0);
7c673cae
FG
604 --agent_ops;
605 agent_cond.Signal();
606 }
607
608 /// note start of an async (flush) op
609 void agent_start_op(const hobject_t& oid) {
11fdf7f2 610 std::lock_guard l(agent_lock);
7c673cae 611 ++agent_ops;
11fdf7f2 612 ceph_assert(agent_oids.count(oid) == 0);
7c673cae
FG
613 agent_oids.insert(oid);
614 }
615
616 /// note finish or cancellation of an async (flush) op
617 void agent_finish_op(const hobject_t& oid) {
11fdf7f2
TL
618 std::lock_guard l(agent_lock);
619 ceph_assert(agent_ops > 0);
7c673cae 620 --agent_ops;
11fdf7f2 621 ceph_assert(agent_oids.count(oid) == 1);
7c673cae
FG
622 agent_oids.erase(oid);
623 agent_cond.Signal();
624 }
625
626 /// check if we are operating on an object
627 bool agent_is_active_oid(const hobject_t& oid) {
11fdf7f2 628 std::lock_guard l(agent_lock);
7c673cae
FG
629 return agent_oids.count(oid);
630 }
631
632 /// get count of active agent ops
633 int agent_get_num_ops() {
11fdf7f2 634 std::lock_guard l(agent_lock);
7c673cae
FG
635 return agent_ops;
636 }
637
638 void agent_inc_high_count() {
11fdf7f2 639 std::lock_guard l(agent_lock);
7c673cae
FG
640 flush_mode_high_count ++;
641 }
642
643 void agent_dec_high_count() {
11fdf7f2 644 std::lock_guard l(agent_lock);
7c673cae
FG
645 flush_mode_high_count --;
646 }
647
648private:
649 /// throttle promotion attempts
11fdf7f2 650 std::atomic<unsigned int> promote_probability_millis{1000}; ///< probability thousands. one word.
7c673cae
FG
651 PromoteCounter promote_counter;
652 utime_t last_recalibrate;
653 unsigned long promote_max_objects, promote_max_bytes;
654
655public:
656 bool promote_throttle() {
657 // NOTE: lockless! we rely on the probability being a single word.
658 promote_counter.attempt();
659 if ((unsigned)rand() % 1000 > promote_probability_millis)
660 return true; // yes throttle (no promote)
661 if (promote_max_objects &&
662 promote_counter.objects > promote_max_objects)
663 return true; // yes throttle
664 if (promote_max_bytes &&
665 promote_counter.bytes > promote_max_bytes)
666 return true; // yes throttle
667 return false; // no throttle (promote)
668 }
669 void promote_finish(uint64_t bytes) {
670 promote_counter.finish(bytes);
671 }
672 void promote_throttle_recalibrate();
673
674 // -- Objecter, for tiering reads/writes from/to other OSDs --
675 Objecter *objecter;
11fdf7f2
TL
676 int m_objecter_finishers;
677 vector<Finisher*> objecter_finishers;
7c673cae
FG
678
679 // -- Watch --
680 Mutex watch_lock;
681 SafeTimer watch_timer;
682 uint64_t next_notif_id;
683 uint64_t get_next_id(epoch_t cur_epoch) {
11fdf7f2 684 std::lock_guard l(watch_lock);
7c673cae
FG
685 return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
686 }
687
688 // -- Recovery/Backfill Request Scheduling --
689 Mutex recovery_request_lock;
690 SafeTimer recovery_request_timer;
691
31f18b77
FG
692 // For async recovery sleep
693 bool recovery_needs_sleep = true;
694 utime_t recovery_schedule_time = utime_t();
695
11fdf7f2
TL
696 // For recovery & scrub & snap
697 Mutex sleep_lock;
698 SafeTimer sleep_timer;
31f18b77 699
7c673cae
FG
700 // -- tids --
701 // for ops i issue
11fdf7f2 702 std::atomic<unsigned int> last_tid{0};
7c673cae
FG
703 ceph_tid_t get_tid() {
704 return (ceph_tid_t)last_tid++;
705 }
706
707 // -- backfill_reservation --
708 Finisher reserver_finisher;
709 AsyncReserver<spg_t> local_reserver;
710 AsyncReserver<spg_t> remote_reserver;
711
11fdf7f2
TL
712 // -- pg merge --
713 Mutex merge_lock = {"OSD::merge_lock"};
714 map<pg_t,eversion_t> ready_to_merge_source; // pg -> version
715 map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target; // pg -> (version,les,lec)
716 set<pg_t> not_ready_to_merge_source;
717 map<pg_t,pg_t> not_ready_to_merge_target;
718 set<pg_t> sent_ready_to_merge_source;
719
720 void set_ready_to_merge_source(PG *pg,
721 eversion_t version);
722 void set_ready_to_merge_target(PG *pg,
723 eversion_t version,
724 epoch_t last_epoch_started,
725 epoch_t last_epoch_clean);
726 void set_not_ready_to_merge_source(pg_t source);
727 void set_not_ready_to_merge_target(pg_t target, pg_t source);
728 void clear_ready_to_merge(PG *pg);
729 void send_ready_to_merge();
730 void _send_ready_to_merge();
731 void clear_sent_ready_to_merge();
732 void prune_sent_ready_to_merge(OSDMapRef& osdmap);
733
7c673cae
FG
734 // -- pg_temp --
735private:
736 Mutex pg_temp_lock;
94b18763 737 struct pg_temp_t {
94b18763
FG
738 vector<int> acting;
739 bool forced = false;
740 };
741 map<pg_t, pg_temp_t> pg_temp_wanted;
742 map<pg_t, pg_temp_t> pg_temp_pending;
7c673cae 743 void _sent_pg_temp();
94b18763 744 friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
7c673cae 745public:
94b18763
FG
746 void queue_want_pg_temp(pg_t pgid, const vector<int>& want,
747 bool forced = false);
7c673cae
FG
748 void remove_want_pg_temp(pg_t pgid);
749 void requeue_pg_temp();
750 void send_pg_temp();
751
11fdf7f2
TL
752 ceph::mutex pg_created_lock = ceph::make_mutex("OSDService::pg_created_lock");
753 set<pg_t> pg_created;
7c673cae 754 void send_pg_created(pg_t pgid);
11fdf7f2
TL
755 void prune_pg_created();
756 void send_pg_created();
31f18b77 757
7c673cae 758 AsyncReserver<spg_t> snap_reserver;
11fdf7f2 759 void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c);
7c673cae 760 void queue_for_snap_trim(PG *pg);
11fdf7f2
TL
761 void queue_for_scrub(PG *pg, bool with_high_priority);
762 void queue_for_pg_delete(spg_t pgid, epoch_t e);
763 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
7c673cae
FG
764
765private:
766 // -- pg recovery and associated throttling --
767 Mutex recovery_lock;
768 list<pair<epoch_t, PGRef> > awaiting_throttle;
769
770 utime_t defer_recovery_until;
771 uint64_t recovery_ops_active;
772 uint64_t recovery_ops_reserved;
773 bool recovery_paused;
774#ifdef DEBUG_RECOVERY_OIDS
775 map<spg_t, set<hobject_t> > recovery_oids;
776#endif
777 bool _recover_now(uint64_t *available_pushes);
778 void _maybe_queue_recovery();
779 void _queue_for_recovery(
11fdf7f2 780 pair<epoch_t, PGRef> p, uint64_t reserved_pushes);
7c673cae
FG
781public:
782 void start_recovery_op(PG *pg, const hobject_t& soid);
783 void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
784 bool is_recovery_active();
11fdf7f2 785 void release_reserved_pushes(uint64_t pushes);
7c673cae
FG
786 void defer_recovery(float defer_for) {
787 defer_recovery_until = ceph_clock_now();
788 defer_recovery_until += defer_for;
789 }
790 void pause_recovery() {
11fdf7f2 791 std::lock_guard l(recovery_lock);
7c673cae
FG
792 recovery_paused = true;
793 }
794 bool recovery_is_paused() {
11fdf7f2 795 std::lock_guard l(recovery_lock);
7c673cae
FG
796 return recovery_paused;
797 }
798 void unpause_recovery() {
11fdf7f2 799 std::lock_guard l(recovery_lock);
7c673cae
FG
800 recovery_paused = false;
801 _maybe_queue_recovery();
802 }
803 void kick_recovery_queue() {
11fdf7f2 804 std::lock_guard l(recovery_lock);
7c673cae
FG
805 _maybe_queue_recovery();
806 }
807 void clear_queued_recovery(PG *pg) {
11fdf7f2
TL
808 std::lock_guard l(recovery_lock);
809 awaiting_throttle.remove_if(
810 [pg](decltype(awaiting_throttle)::const_reference awaiting ) {
811 return awaiting.second.get() == pg;
812 });
7c673cae
FG
813 }
814 // delayed pg activation
c07f9fc5 815 void queue_for_recovery(PG *pg) {
11fdf7f2 816 std::lock_guard l(recovery_lock);
c07f9fc5 817
11fdf7f2 818 if (pg->is_forced_recovery_or_backfill()) {
7c673cae
FG
819 awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
820 } else {
821 awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
822 }
823 _maybe_queue_recovery();
824 }
31f18b77 825 void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
11fdf7f2 826 std::lock_guard l(recovery_lock);
31f18b77
FG
827 _queue_for_recovery(make_pair(queued, pg), reserved_pushes);
828 }
7c673cae 829
7c673cae
FG
830 // osd map cache (past osd maps)
831 Mutex map_cache_lock;
832 SharedLRU<epoch_t, const OSDMap> map_cache;
833 SimpleLRU<epoch_t, bufferlist> map_bl_cache;
834 SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
835
11fdf7f2
TL
836 /// final pg_num values for recently deleted pools
837 map<int64_t,int> deleted_pool_pg_nums;
838
7c673cae
FG
839 OSDMapRef try_get_map(epoch_t e);
840 OSDMapRef get_map(epoch_t e) {
841 OSDMapRef ret(try_get_map(e));
11fdf7f2 842 ceph_assert(ret);
7c673cae
FG
843 return ret;
844 }
845 OSDMapRef add_map(OSDMap *o) {
11fdf7f2 846 std::lock_guard l(map_cache_lock);
7c673cae
FG
847 return _add_map(o);
848 }
849 OSDMapRef _add_map(OSDMap *o);
850
851 void add_map_bl(epoch_t e, bufferlist& bl) {
11fdf7f2 852 std::lock_guard l(map_cache_lock);
7c673cae
FG
853 return _add_map_bl(e, bl);
854 }
7c673cae
FG
855 void _add_map_bl(epoch_t e, bufferlist& bl);
856 bool get_map_bl(epoch_t e, bufferlist& bl) {
11fdf7f2 857 std::lock_guard l(map_cache_lock);
7c673cae
FG
858 return _get_map_bl(e, bl);
859 }
860 bool _get_map_bl(epoch_t e, bufferlist& bl);
861
862 void add_map_inc_bl(epoch_t e, bufferlist& bl) {
11fdf7f2 863 std::lock_guard l(map_cache_lock);
7c673cae
FG
864 return _add_map_inc_bl(e, bl);
865 }
7c673cae
FG
866 void _add_map_inc_bl(epoch_t e, bufferlist& bl);
867 bool get_inc_map_bl(epoch_t e, bufferlist& bl);
868
11fdf7f2
TL
869 /// get last pg_num before a pool was deleted (if any)
870 int get_deleted_pool_pg_num(int64_t pool);
7c673cae 871
11fdf7f2
TL
872 void store_deleted_pool_pg_num(int64_t pool, int pg_num) {
873 std::lock_guard l(map_cache_lock);
874 deleted_pool_pg_nums[pool] = pg_num;
875 }
7c673cae 876
11fdf7f2
TL
877 /// get pgnum from newmap or, if pool was deleted, last map pool existed in
878 int get_possibly_deleted_pool_pg_num(OSDMapRef newmap,
879 int64_t pool) {
880 if (newmap->have_pg_pool(pool)) {
881 return newmap->get_pg_num(pool);
882 }
883 return get_deleted_pool_pg_num(pool);
884 }
885
886 /// identify split child pgids over a osdmap interval
887 void identify_splits_and_merges(
888 OSDMapRef old_map,
889 OSDMapRef new_map,
890 spg_t pgid,
891 set<pair<spg_t,epoch_t>> *new_children,
892 set<pair<spg_t,epoch_t>> *merge_pgs);
893
894 void need_heartbeat_peer_update();
7c673cae
FG
895
896 void init();
897 void final_init();
898 void start_shutdown();
31f18b77 899 void shutdown_reserver();
7c673cae
FG
900 void shutdown();
901
7c673cae
FG
902 // -- stats --
903 Mutex stat_lock;
904 osd_stat_t osd_stat;
31f18b77 905 uint32_t seq = 0;
7c673cae 906
11fdf7f2
TL
907 void set_statfs(const struct store_statfs_t &stbuf,
908 osd_alert_list_t& alerts);
909 osd_stat_t set_osd_stat(vector<int>& hb_peers, int num_pgs);
910 void inc_osd_stat_repaired(void);
911 float compute_adjusted_ratio(osd_stat_t new_stat, float *pratio, uint64_t adjust_used = 0);
7c673cae 912 osd_stat_t get_osd_stat() {
11fdf7f2 913 std::lock_guard l(stat_lock);
31f18b77
FG
914 ++seq;
915 osd_stat.up_from = up_epoch;
916 osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
7c673cae
FG
917 return osd_stat;
918 }
31f18b77 919 uint64_t get_osd_stat_seq() {
11fdf7f2 920 std::lock_guard l(stat_lock);
31f18b77
FG
921 return osd_stat.seq;
922 }
7c673cae
FG
923
924 // -- OSD Full Status --
925private:
926 friend TestOpsSocketHook;
927 mutable Mutex full_status_lock;
928 enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
929 const char *get_full_state_name(s_names s) const {
930 switch (s) {
931 case NONE: return "none";
932 case NEARFULL: return "nearfull";
933 case BACKFILLFULL: return "backfillfull";
934 case FULL: return "full";
935 case FAILSAFE: return "failsafe";
936 default: return "???";
937 }
938 }
939 s_names get_full_state(string type) const {
940 if (type == "none")
941 return NONE;
942 else if (type == "failsafe")
943 return FAILSAFE;
944 else if (type == "full")
945 return FULL;
946 else if (type == "backfillfull")
947 return BACKFILLFULL;
948 else if (type == "nearfull")
949 return NEARFULL;
950 else
951 return INVALID;
952 }
11fdf7f2 953 double cur_ratio, physical_ratio; ///< current utilization
7c673cae
FG
954 mutable int64_t injectfull = 0;
955 s_names injectfull_state = NONE;
956 float get_failsafe_full_ratio();
11fdf7f2
TL
957 bool _check_inject_full(DoutPrefixProvider *dpp, s_names type) const;
958 bool _check_full(DoutPrefixProvider *dpp, s_names type) const;
7c673cae 959public:
11fdf7f2
TL
960 void check_full_status(float ratio, float pratio);
961 s_names recalc_full_state(float ratio, float pratio, string &inject);
962 bool _tentative_full(DoutPrefixProvider *dpp, s_names type, uint64_t adjust_used, osd_stat_t);
963 bool check_failsafe_full(DoutPrefixProvider *dpp) const;
964 bool check_full(DoutPrefixProvider *dpp) const;
965 bool tentative_backfill_full(DoutPrefixProvider *dpp, uint64_t adjust_used, osd_stat_t);
966 bool check_backfill_full(DoutPrefixProvider *dpp) const;
967 bool check_nearfull(DoutPrefixProvider *dpp) const;
7c673cae
FG
968 bool is_failsafe_full() const;
969 bool is_full() const;
970 bool is_backfillfull() const;
971 bool is_nearfull() const;
972 bool need_fullness_update(); ///< osdmap state needs update
973 void set_injectfull(s_names type, int64_t count);
974 bool check_osdmap_full(const set<pg_shard_t> &missing_on);
975
976
977 // -- epochs --
978private:
979 mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch
980 epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
981 epoch_t up_epoch; // _most_recent_ epoch we were marked up
982 epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
983public:
984 /**
985 * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params
986 * can be NULL if you don't care about them.
987 */
988 void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
989 epoch_t *_bind_epoch) const;
990 /**
991 * Set the boot, up, and bind epochs. Any NULL params will not be set.
992 */
993 void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
994 const epoch_t *_bind_epoch);
995 epoch_t get_boot_epoch() const {
996 epoch_t ret;
997 retrieve_epochs(&ret, NULL, NULL);
998 return ret;
999 }
1000 epoch_t get_up_epoch() const {
1001 epoch_t ret;
1002 retrieve_epochs(NULL, &ret, NULL);
1003 return ret;
1004 }
1005 epoch_t get_bind_epoch() const {
1006 epoch_t ret;
1007 retrieve_epochs(NULL, NULL, &ret);
1008 return ret;
1009 }
1010
181888fb
FG
1011 void request_osdmap_update(epoch_t e);
1012
7c673cae
FG
1013 // -- stopping --
1014 Mutex is_stopping_lock;
1015 Cond is_stopping_cond;
1016 enum {
1017 NOT_STOPPING,
1018 PREPARING_TO_STOP,
1019 STOPPING };
11fdf7f2
TL
1020 std::atomic<int> state{NOT_STOPPING};
1021 int get_state() const {
7c673cae
FG
1022 return state;
1023 }
1024 void set_state(int s) {
1025 state = s;
1026 }
1027 bool is_stopping() const {
1028 return state == STOPPING;
1029 }
1030 bool is_preparing_to_stop() const {
1031 return state == PREPARING_TO_STOP;
1032 }
1033 bool prepare_to_stop();
1034 void got_stop_ack();
1035
1036
1037#ifdef PG_DEBUG_REFS
1038 Mutex pgid_lock;
1039 map<spg_t, int> pgid_tracker;
1040 map<spg_t, PG*> live_pgs;
31f18b77
FG
1041 void add_pgid(spg_t pgid, PG *pg);
1042 void remove_pgid(spg_t pgid, PG *pg);
1043 void dump_live_pgids();
7c673cae
FG
1044#endif
1045
1046 explicit OSDService(OSD *osd);
1047 ~OSDService();
1048};
1049
11fdf7f2
TL
1050
1051enum class io_queue {
1052 prioritized,
1053 weightedpriority,
1054 mclock_opclass,
1055 mclock_client,
1056};
1057
1058
1059/*
1060
1061 Each PG slot includes queues for events that are processing and/or waiting
1062 for a PG to be materialized in the slot.
1063
1064 These are the constraints:
1065
1066 - client ops must remained ordered by client, regardless of map epoch
1067 - peering messages/events from peers must remain ordered by peer
1068 - peering messages and client ops need not be ordered relative to each other
1069
1070 - some peering events can create a pg (e.g., notify)
1071 - the query peering event can proceed when a PG doesn't exist
1072
1073 Implementation notes:
1074
1075 - everybody waits for split. If the OSD has the parent PG it will instantiate
1076 the PGSlot early and mark it waiting_for_split. Everything will wait until
1077 the parent is able to commit the split operation and the child PG's are
1078 materialized in the child slots.
1079
1080 - every event has an epoch property and will wait for the OSDShard to catch
1081 up to that epoch. For example, if we get a peering event from a future
1082 epoch, the event will wait in the slot until the local OSD has caught up.
1083 (We should be judicious in specifying the required epoch [by, e.g., setting
1084 it to the same_interval_since epoch] so that we don't wait for epochs that
1085 don't affect the given PG.)
1086
1087 - we maintain two separate wait lists, *waiting* and *waiting_peering*. The
1088 OpQueueItem has an is_peering() bool to determine which we use. Waiting
1089 peering events are queued up by epoch required.
1090
1091 - when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or
1092 materialized the PG), we wake *all* waiting items. (This could be optimized,
1093 probably, but we don't bother.) We always requeue peering items ahead of
1094 client ops.
1095
1096 - some peering events are marked !peering_requires_pg (PGQuery). if we do
1097 not have a PG these are processed immediately (under the shard lock).
1098
1099 - we do not have a PG present, we check if the slot maps to the current host.
1100 if so, we either queue the item and wait for the PG to materialize, or
1101 (if the event is a pg creating event like PGNotify), we materialize the PG.
1102
1103 - when we advance the osdmap on the OSDShard, we scan pg slots and
1104 discard any slots with no pg (and not waiting_for_split) that no
1105 longer map to the current host.
1106
1107 */
1108
1109struct OSDShardPGSlot {
1110 PGRef pg; ///< pg reference
1111 deque<OpQueueItem> to_process; ///< order items for this slot
1112 int num_running = 0; ///< _process threads doing pg lookup/lock
1113
1114 deque<OpQueueItem> waiting; ///< waiting for pg (or map + pg)
1115
1116 /// waiting for map (peering evt)
1117 map<epoch_t,deque<OpQueueItem>> waiting_peering;
1118
1119 /// incremented by wake_pg_waiters; indicates racing _process threads
1120 /// should bail out (their op has been requeued)
1121 uint64_t requeue_seq = 0;
1122
1123 /// waiting for split child to materialize in these epoch(s)
1124 set<epoch_t> waiting_for_split;
1125
1126 epoch_t epoch = 0;
1127 boost::intrusive::set_member_hook<> pg_epoch_item;
1128
1129 /// waiting for a merge (source or target) by this epoch
1130 epoch_t waiting_for_merge_epoch = 0;
1131};
1132
1133struct OSDShard {
1134 const unsigned shard_id;
1135 CephContext *cct;
1136 OSD *osd;
1137
1138 string shard_name;
1139
1140 string sdata_wait_lock_name;
1141 ceph::mutex sdata_wait_lock;
1142 ceph::condition_variable sdata_cond;
1143
1144 string osdmap_lock_name;
1145 ceph::mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock
1146 OSDMapRef shard_osdmap;
1147
1148 OSDMapRef get_osdmap() {
1149 std::lock_guard l(osdmap_lock);
1150 return shard_osdmap;
1151 }
1152
1153 string shard_lock_name;
1154 ceph::mutex shard_lock; ///< protects remaining members below
1155
1156 /// map of slots for each spg_t. maintains ordering of items dequeued
1157 /// from pqueue while _process thread drops shard lock to acquire the
1158 /// pg lock. stale slots are removed by consume_map.
1159 unordered_map<spg_t,unique_ptr<OSDShardPGSlot>> pg_slots;
1160
1161 struct pg_slot_compare_by_epoch {
1162 bool operator()(const OSDShardPGSlot& l, const OSDShardPGSlot& r) const {
1163 return l.epoch < r.epoch;
1164 }
1165 };
1166
1167 /// maintain an ordering of pg slots by pg epoch
1168 boost::intrusive::multiset<
1169 OSDShardPGSlot,
1170 boost::intrusive::member_hook<
1171 OSDShardPGSlot,
1172 boost::intrusive::set_member_hook<>,
1173 &OSDShardPGSlot::pg_epoch_item>,
1174 boost::intrusive::compare<pg_slot_compare_by_epoch>> pg_slots_by_epoch;
1175 int waiting_for_min_pg_epoch = 0;
1176 ceph::condition_variable min_pg_epoch_cond;
1177
1178 /// priority queue
1179 std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
1180
1181 bool stop_waiting = false;
1182
1183 ContextQueue context_queue;
1184
1185 void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
1186 unsigned priority = item.get_priority();
1187 unsigned cost = item.get_cost();
1188 if (priority >= cutoff)
1189 pqueue->enqueue_strict_front(
1190 item.get_owner(),
1191 priority, std::move(item));
1192 else
1193 pqueue->enqueue_front(
1194 item.get_owner(),
1195 priority, cost, std::move(item));
1196 }
1197
1198 void _attach_pg(OSDShardPGSlot *slot, PG *pg);
1199 void _detach_pg(OSDShardPGSlot *slot);
1200
1201 void update_pg_epoch(OSDShardPGSlot *slot, epoch_t epoch);
1202 epoch_t get_min_pg_epoch();
1203 void wait_min_pg_epoch(epoch_t need);
1204
1205 /// return newest epoch we are waiting for
1206 epoch_t get_max_waiting_epoch();
1207
1208 /// push osdmap into shard
1209 void consume_map(
1210 OSDMapRef& osdmap,
1211 unsigned *pushes_to_free);
1212
1213 void _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
1214
1215 void identify_splits_and_merges(
1216 const OSDMapRef& as_of_osdmap,
1217 set<pair<spg_t,epoch_t>> *split_children,
1218 set<pair<spg_t,epoch_t>> *merge_pgs);
1219 void _prime_splits(set<pair<spg_t,epoch_t>> *pgids);
1220 void prime_splits(const OSDMapRef& as_of_osdmap,
1221 set<pair<spg_t,epoch_t>> *pgids);
1222 void prime_merges(const OSDMapRef& as_of_osdmap,
1223 set<pair<spg_t,epoch_t>> *merge_pgs);
1224 void register_and_wake_split_child(PG *pg);
1225 void unprime_split_children(spg_t parent, unsigned old_pg_num);
1226
1227 OSDShard(
1228 int id,
1229 CephContext *cct,
1230 OSD *osd,
1231 uint64_t max_tok_per_prio, uint64_t min_cost,
1232 io_queue opqueue)
1233 : shard_id(id),
1234 cct(cct),
1235 osd(osd),
1236 shard_name(string("OSDShard.") + stringify(id)),
1237 sdata_wait_lock_name(shard_name + "::sdata_wait_lock"),
1238 sdata_wait_lock{make_mutex(sdata_wait_lock_name)},
1239 osdmap_lock_name(shard_name + "::osdmap_lock"),
1240 osdmap_lock{make_mutex(osdmap_lock_name)},
1241 shard_lock_name(shard_name + "::shard_lock"),
1242 shard_lock{make_mutex(shard_lock_name)},
1243 context_queue(sdata_wait_lock, sdata_cond) {
1244 if (opqueue == io_queue::weightedpriority) {
1245 pqueue = std::make_unique<
1246 WeightedPriorityQueue<OpQueueItem,uint64_t>>(
1247 max_tok_per_prio, min_cost);
1248 } else if (opqueue == io_queue::prioritized) {
1249 pqueue = std::make_unique<
1250 PrioritizedQueue<OpQueueItem,uint64_t>>(
1251 max_tok_per_prio, min_cost);
1252 } else if (opqueue == io_queue::mclock_opclass) {
1253 pqueue = std::make_unique<ceph::mClockOpClassQueue>(cct);
1254 } else if (opqueue == io_queue::mclock_client) {
1255 pqueue = std::make_unique<ceph::mClockClientQueue>(cct);
1256 }
1257 }
1258};
1259
7c673cae
FG
1260class OSD : public Dispatcher,
1261 public md_config_obs_t {
1262 /** OSD **/
11fdf7f2 1263 Mutex osd_lock; // global lock
7c673cae
FG
1264 SafeTimer tick_timer; // safe timer (osd_lock)
1265
1266 // Tick timer for those stuff that do not need osd_lock
1267 Mutex tick_timer_lock;
1268 SafeTimer tick_timer_without_osd_lock;
11fdf7f2
TL
1269 std::string gss_ktfile_client{};
1270
7c673cae
FG
1271public:
1272 // config observer bits
1273 const char** get_tracked_conf_keys() const override;
11fdf7f2 1274 void handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
1275 const std::set <std::string> &changed) override;
1276 void update_log_config();
1277 void check_config();
1278
1279protected:
1280
91327a77
AA
1281 const double OSD_TICK_INTERVAL = { 1.0 };
1282 double get_tick_interval() const;
7c673cae 1283
7c673cae
FG
1284 Messenger *cluster_messenger;
1285 Messenger *client_messenger;
1286 Messenger *objecter_messenger;
1287 MonClient *monc; // check the "monc helpers" list before accessing directly
1288 MgrClient mgrc;
1289 PerfCounters *logger;
1290 PerfCounters *recoverystate_perf;
1291 ObjectStore *store;
1292#ifdef HAVE_LIBFUSE
1293 FuseStore *fuse_store = nullptr;
1294#endif
1295 LogClient log_client;
1296 LogChannelRef clog;
1297
1298 int whoami;
1299 std::string dev_path, journal_path;
1300
11fdf7f2
TL
1301 int last_require_osd_release = 0;
1302
1303 int numa_node = -1;
1304 size_t numa_cpu_set_size = 0;
1305 cpu_set_t numa_cpu_set;
1306
31f18b77 1307 bool store_is_rotational = true;
d2e6a577 1308 bool journal_is_rotational = true;
31f18b77 1309
7c673cae
FG
1310 ZTracer::Endpoint trace_endpoint;
1311 void create_logger();
1312 void create_recoverystate_perf();
1313 void tick();
1314 void tick_without_osd_lock();
1315 void _dispatch(Message *m);
1316 void dispatch_op(OpRequestRef op);
1317
11fdf7f2 1318 void check_osdmap_features();
7c673cae
FG
1319
1320 // asok
1321 friend class OSDSocketHook;
1322 class OSDSocketHook *asok_hook;
11fdf7f2
TL
1323 bool asok_command(std::string_view admin_command, const cmdmap_t& cmdmap,
1324 std::string_view format, std::ostream& ss);
7c673cae
FG
1325
1326public:
1327 ClassHandler *class_handler = nullptr;
1328 int get_nodeid() { return whoami; }
1329
1330 static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
1331 char foo[20];
1332 snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
1333 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1334 }
1335 static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
1336 char foo[22];
1337 snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch);
1338 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1339 }
1340
1341 static ghobject_t make_snapmapper_oid() {
1342 return ghobject_t(hobject_t(
1343 sobject_t(
1344 object_t("snapmapper"),
1345 0)));
1346 }
1347
1348 static ghobject_t make_pg_log_oid(spg_t pg) {
1349 stringstream ss;
1350 ss << "pglog_" << pg;
1351 string s;
1352 getline(ss, s);
1353 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1354 }
1355
1356 static ghobject_t make_pg_biginfo_oid(spg_t pg) {
1357 stringstream ss;
1358 ss << "pginfo_" << pg;
1359 string s;
1360 getline(ss, s);
1361 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1362 }
1363 static ghobject_t make_infos_oid() {
1364 hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
1365 return ghobject_t(oid);
1366 }
11fdf7f2
TL
1367
1368 static ghobject_t make_final_pool_info_oid(int64_t pool) {
1369 return ghobject_t(
1370 hobject_t(
1371 sobject_t(
1372 object_t(string("final_pool_") + stringify(pool)),
1373 CEPH_NOSNAP)));
1374 }
1375
1376 static ghobject_t make_pg_num_history_oid() {
1377 return ghobject_t(hobject_t(sobject_t("pg_num_history", CEPH_NOSNAP)));
1378 }
1379
7c673cae
FG
1380 static void recursive_remove_collection(CephContext* cct,
1381 ObjectStore *store,
1382 spg_t pgid,
1383 coll_t tmp);
1384
1385 /**
1386 * get_osd_initial_compat_set()
1387 *
1388 * Get the initial feature set for this OSD. Features
1389 * here are automatically upgraded.
1390 *
1391 * Return value: Initial osd CompatSet
1392 */
1393 static CompatSet get_osd_initial_compat_set();
1394
1395 /**
1396 * get_osd_compat_set()
1397 *
1398 * Get all features supported by this OSD
1399 *
1400 * Return value: CompatSet of all supported features
1401 */
1402 static CompatSet get_osd_compat_set();
1403
1404
1405private:
1406 class C_Tick;
1407 class C_Tick_WithoutOSDLock;
1408
11fdf7f2
TL
1409 // -- config settings --
1410 float m_osd_pg_epoch_max_lag_factor;
1411
7c673cae
FG
1412 // -- superblock --
1413 OSDSuperblock superblock;
1414
1415 void write_superblock();
1416 void write_superblock(ObjectStore::Transaction& t);
1417 int read_superblock();
1418
1419 void clear_temp_objects();
1420
1421 CompatSet osd_compat;
1422
1423 // -- state --
1424public:
1425 typedef enum {
1426 STATE_INITIALIZING = 1,
1427 STATE_PREBOOT,
1428 STATE_BOOTING,
1429 STATE_ACTIVE,
1430 STATE_STOPPING,
1431 STATE_WAITING_FOR_HEALTHY
1432 } osd_state_t;
1433
1434 static const char *get_state_name(int s) {
1435 switch (s) {
1436 case STATE_INITIALIZING: return "initializing";
1437 case STATE_PREBOOT: return "preboot";
1438 case STATE_BOOTING: return "booting";
1439 case STATE_ACTIVE: return "active";
1440 case STATE_STOPPING: return "stopping";
1441 case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy";
1442 default: return "???";
1443 }
1444 }
1445
1446private:
11fdf7f2 1447 std::atomic<int> state{STATE_INITIALIZING};
7c673cae
FG
1448
1449public:
1450 int get_state() const {
1451 return state;
1452 }
1453 void set_state(int s) {
1454 state = s;
1455 }
1456 bool is_initializing() const {
1457 return state == STATE_INITIALIZING;
1458 }
1459 bool is_preboot() const {
1460 return state == STATE_PREBOOT;
1461 }
1462 bool is_booting() const {
1463 return state == STATE_BOOTING;
1464 }
1465 bool is_active() const {
1466 return state == STATE_ACTIVE;
1467 }
1468 bool is_stopping() const {
1469 return state == STATE_STOPPING;
1470 }
1471 bool is_waiting_for_healthy() const {
1472 return state == STATE_WAITING_FOR_HEALTHY;
1473 }
1474
1475private:
1476
7c673cae 1477 ShardedThreadPool osd_op_tp;
7c673cae
FG
1478 ThreadPool command_tp;
1479
7c673cae
FG
1480 void get_latest_osdmap();
1481
1482 // -- sessions --
1483private:
11fdf7f2 1484 void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap);
7c673cae
FG
1485 void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
1486
1487 Mutex session_waiting_lock;
11fdf7f2 1488 set<SessionRef> session_waiting_for_map;
7c673cae
FG
1489
1490 /// Caller assumes refs for included Sessions
11fdf7f2
TL
1491 void get_sessions_waiting_for_map(set<SessionRef> *out) {
1492 std::lock_guard l(session_waiting_lock);
7c673cae
FG
1493 out->swap(session_waiting_for_map);
1494 }
11fdf7f2
TL
1495 void register_session_waiting_on_map(SessionRef session) {
1496 std::lock_guard l(session_waiting_lock);
1497 session_waiting_for_map.insert(session);
7c673cae 1498 }
11fdf7f2
TL
1499 void clear_session_waiting_on_map(SessionRef session) {
1500 std::lock_guard l(session_waiting_lock);
1501 session_waiting_for_map.erase(session);
7c673cae
FG
1502 }
1503 void dispatch_sessions_waiting_on_map() {
11fdf7f2 1504 set<SessionRef> sessions_to_check;
7c673cae 1505 get_sessions_waiting_for_map(&sessions_to_check);
11fdf7f2 1506 for (auto i = sessions_to_check.begin();
7c673cae
FG
1507 i != sessions_to_check.end();
1508 sessions_to_check.erase(i++)) {
11fdf7f2
TL
1509 std::lock_guard l{(*i)->session_dispatch_lock};
1510 SessionRef session = *i;
1511 dispatch_session_waiting(session, osdmap);
7c673cae
FG
1512 }
1513 }
11fdf7f2
TL
1514 void session_handle_reset(SessionRef session) {
1515 std::lock_guard l(session->session_dispatch_lock);
7c673cae
FG
1516 clear_session_waiting_on_map(session);
1517
1518 session->clear_backoffs();
1519
1520 /* Messages have connection refs, we need to clear the
1521 * connection->session->message->connection
1522 * cycles which result.
1523 * Bug #12338
1524 */
1525 session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
1526 }
1527
1528private:
1529 /**
1530 * @defgroup monc helpers
1531 * @{
1532 * Right now we only have the one
1533 */
1534
1535 /**
1536 * Ask the Monitors for a sequence of OSDMaps.
1537 *
1538 * @param epoch The epoch to start with when replying
1539 * @param force_request True if this request forces a new subscription to
1540 * the monitors; false if an outstanding request that encompasses it is
1541 * sufficient.
1542 */
1543 void osdmap_subscribe(version_t epoch, bool force_request);
1544 /** @} monc helpers */
1545
181888fb
FG
1546 Mutex osdmap_subscribe_lock;
1547 epoch_t latest_subscribed_epoch{0};
1548
7c673cae
FG
1549 // -- heartbeat --
1550 /// information about a heartbeat peer
1551 struct HeartbeatInfo {
1552 int peer; ///< peer
1553 ConnectionRef con_front; ///< peer connection (front)
1554 ConnectionRef con_back; ///< peer connection (back)
1555 utime_t first_tx; ///< time we sent our first ping request
1556 utime_t last_tx; ///< last time we sent a ping request
1557 utime_t last_rx_front; ///< last time we got a ping reply on the front side
1558 utime_t last_rx_back; ///< last time we got a ping reply on the back side
1559 epoch_t epoch; ///< most recent epoch we wanted this peer
11fdf7f2
TL
1560 /// number of connections we send and receive heartbeat pings/replies
1561 static constexpr int HEARTBEAT_MAX_CONN = 2;
1562 /// history of inflight pings, arranging by timestamp we sent
1563 /// send time -> deadline -> remaining replies
1564 map<utime_t, pair<utime_t, int>> ping_history;
1565
1566 bool is_unhealthy(utime_t now) {
1567 if (ping_history.empty()) {
1568 /// we haven't sent a ping yet or we have got all replies,
1569 /// in either way we are safe and healthy for now
1570 return false;
1571 }
7c673cae 1572
11fdf7f2
TL
1573 utime_t oldest_deadline = ping_history.begin()->second.first;
1574 return now > oldest_deadline;
7c673cae
FG
1575 }
1576
11fdf7f2
TL
1577 bool is_healthy(utime_t now) {
1578 if (last_rx_front == utime_t() || last_rx_back == utime_t()) {
1579 // only declare to be healthy until we have received the first
1580 // replies from both front/back connections
1581 return false;
1582 }
1583 return !is_unhealthy(now);
1584 }
7c673cae
FG
1585 };
1586 /// state attached to outgoing heartbeat connections
1587 struct HeartbeatSession : public RefCountedObject {
1588 int peer;
1589 explicit HeartbeatSession(int p) : peer(p) {}
1590 };
1591 Mutex heartbeat_lock;
1592 map<int, int> debug_heartbeat_drops_remaining;
1593 Cond heartbeat_cond;
1594 bool heartbeat_stop;
11fdf7f2 1595 std::atomic<bool> heartbeat_need_update;
7c673cae
FG
1596 map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
1597 utime_t last_mon_heartbeat;
1598 Messenger *hb_front_client_messenger;
1599 Messenger *hb_back_client_messenger;
1600 Messenger *hb_front_server_messenger;
1601 Messenger *hb_back_server_messenger;
1602 utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
1603 double daily_loadavg;
1604
1605 void _add_heartbeat_peer(int p);
1606 void _remove_heartbeat_peer(int p);
1607 bool heartbeat_reset(Connection *con);
1608 void maybe_update_heartbeat_peers();
1609 void reset_heartbeat_peers();
1610 bool heartbeat_peers_need_update() {
1611 return heartbeat_need_update.load();
1612 }
1613 void heartbeat_set_peers_need_update() {
1614 heartbeat_need_update.store(true);
1615 }
1616 void heartbeat_clear_peers_need_update() {
1617 heartbeat_need_update.store(false);
1618 }
1619 void heartbeat();
1620 void heartbeat_check();
1621 void heartbeat_entry();
1622 void need_heartbeat_peer_update();
1623
1624 void heartbeat_kick() {
11fdf7f2 1625 std::lock_guard l(heartbeat_lock);
7c673cae
FG
1626 heartbeat_cond.Signal();
1627 }
1628
1629 struct T_Heartbeat : public Thread {
1630 OSD *osd;
1631 explicit T_Heartbeat(OSD *o) : osd(o) {}
1632 void *entry() override {
1633 osd->heartbeat_entry();
1634 return 0;
1635 }
1636 } heartbeat_thread;
1637
1638public:
1639 bool heartbeat_dispatch(Message *m);
1640
1641 struct HeartbeatDispatcher : public Dispatcher {
1642 OSD *osd;
1643 explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
1644
1645 bool ms_can_fast_dispatch_any() const override { return true; }
1646 bool ms_can_fast_dispatch(const Message *m) const override {
1647 switch (m->get_type()) {
11fdf7f2
TL
1648 case CEPH_MSG_PING:
1649 case MSG_OSD_PING:
1650 return true;
1651 default:
1652 return false;
1653 }
7c673cae
FG
1654 }
1655 void ms_fast_dispatch(Message *m) override {
1656 osd->heartbeat_dispatch(m);
1657 }
1658 bool ms_dispatch(Message *m) override {
1659 return osd->heartbeat_dispatch(m);
1660 }
1661 bool ms_handle_reset(Connection *con) override {
1662 return osd->heartbeat_reset(con);
1663 }
1664 void ms_handle_remote_reset(Connection *con) override {}
1665 bool ms_handle_refused(Connection *con) override {
1666 return osd->ms_handle_refused(con);
1667 }
11fdf7f2 1668 int ms_handle_authentication(Connection *con) override {
7c673cae
FG
1669 return true;
1670 }
11fdf7f2
TL
1671 bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer) override {
1672 // some pre-nautilus OSDs get confused if you include an
1673 // authorizer but they are not expecting it. do not try to authorize
1674 // heartbeat connections until all OSDs are nautilus.
1675 if (osd->get_osdmap()->require_osd_release >= CEPH_RELEASE_NAUTILUS) {
1676 return osd->ms_get_authorizer(dest_type, authorizer);
1677 }
1678 return false;
1679 }
1680 KeyStore *ms_get_auth1_authorizer_keystore() override {
1681 return osd->ms_get_auth1_authorizer_keystore();
1682 }
7c673cae
FG
1683 } heartbeat_dispatcher;
1684
1685private:
1686 // -- waiters --
1687 list<OpRequestRef> finished;
1688
1689 void take_waiters(list<OpRequestRef>& ls) {
11fdf7f2 1690 ceph_assert(osd_lock.is_locked());
7c673cae
FG
1691 finished.splice(finished.end(), ls);
1692 }
1693 void do_waiters();
1694
1695 // -- op tracking --
1696 OpTracker op_tracker;
7c673cae
FG
1697 void test_ops(std::string command, std::string args, ostream& ss);
1698 friend class TestOpsSocketHook;
1699 TestOpsSocketHook *test_ops_hook;
11fdf7f2 1700 friend struct C_FinishSplits;
7c673cae
FG
1701 friend struct C_OpenPGs;
1702
1703 // -- op queue --
11fdf7f2 1704 friend std::ostream& operator<<(std::ostream& out, const io_queue& q);
224ce89b 1705
7c673cae 1706 const io_queue op_queue;
11fdf7f2 1707public:
7c673cae 1708 const unsigned int op_prio_cutoff;
11fdf7f2 1709protected:
7c673cae
FG
1710
1711 /*
1712 * The ordered op delivery chain is:
1713 *
1714 * fast dispatch -> pqueue back
1715 * pqueue front <-> to_process back
1716 * to_process front -> RunVis(item)
1717 * <- queue_front()
1718 *
1719 * The pqueue is per-shard, and to_process is per pg_slot. Items can be
1720 * pushed back up into to_process and/or pqueue while order is preserved.
1721 *
1722 * Multiple worker threads can operate on each shard.
1723 *
11fdf7f2 1724 * Under normal circumstances, num_running == to_process.size(). There are
7c673cae
FG
1725 * two times when that is not true: (1) when waiting_for_pg == true and
1726 * to_process is accumulating requests that are waiting for the pg to be
1727 * instantiated; in that case they will all get requeued together by
1728 * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
1729 * and already requeued the items.
1730 */
11fdf7f2
TL
1731 friend class PGOpItem;
1732 friend class PGPeeringItem;
1733 friend class PGRecovery;
1734 friend class PGDelete;
224ce89b 1735
7c673cae 1736 class ShardedOpWQ
11fdf7f2 1737 : public ShardedThreadPool::ShardedWQ<OpQueueItem>
7c673cae 1738 {
7c673cae 1739 OSD *osd;
7c673cae
FG
1740
1741 public:
11fdf7f2 1742 ShardedOpWQ(OSD *o,
7c673cae
FG
1743 time_t ti,
1744 time_t si,
1745 ShardedThreadPool* tp)
11fdf7f2
TL
1746 : ShardedThreadPool::ShardedWQ<OpQueueItem>(ti, si, tp),
1747 osd(o) {
7c673cae
FG
1748 }
1749
11fdf7f2
TL
1750 void _add_slot_waiter(
1751 spg_t token,
1752 OSDShardPGSlot *slot,
1753 OpQueueItem&& qi);
7c673cae
FG
1754
1755 /// try to do some work
1756 void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
1757
1758 /// enqueue a new item
11fdf7f2 1759 void _enqueue(OpQueueItem&& item) override;
7c673cae
FG
1760
1761 /// requeue an old item (at the front of the line)
11fdf7f2 1762 void _enqueue_front(OpQueueItem&& item) override;
7c673cae
FG
1763
1764 void return_waiting_threads() override {
11fdf7f2
TL
1765 for(uint32_t i = 0; i < osd->num_shards; i++) {
1766 OSDShard* sdata = osd->shards[i];
1767 assert (NULL != sdata);
1768 std::scoped_lock l{sdata->sdata_wait_lock};
1769 sdata->stop_waiting = true;
1770 sdata->sdata_cond.notify_all();
7c673cae
FG
1771 }
1772 }
1773
11fdf7f2
TL
1774 void stop_return_waiting_threads() override {
1775 for(uint32_t i = 0; i < osd->num_shards; i++) {
1776 OSDShard* sdata = osd->shards[i];
7c673cae 1777 assert (NULL != sdata);
11fdf7f2
TL
1778 std::scoped_lock l{sdata->sdata_wait_lock};
1779 sdata->stop_waiting = false;
1780 }
1781 }
1782
1783 void dump(Formatter *f) {
1784 for(uint32_t i = 0; i < osd->num_shards; i++) {
1785 auto &&sdata = osd->shards[i];
1786
1787 char queue_name[32] = {0};
1788 snprintf(queue_name, sizeof(queue_name), "%s%" PRIu32, "OSD:ShardedOpWQ:", i);
1789 ceph_assert(NULL != sdata);
1790
1791 std::scoped_lock l{sdata->shard_lock};
1792 f->open_object_section(queue_name);
7c673cae
FG
1793 sdata->pqueue->dump(f);
1794 f->close_section();
7c673cae
FG
1795 }
1796 }
1797
11fdf7f2
TL
1798 bool is_shard_empty(uint32_t thread_index) override {
1799 uint32_t shard_index = thread_index % osd->num_shards;
1800 auto &&sdata = osd->shards[shard_index];
1801 ceph_assert(sdata);
1802 std::lock_guard l(sdata->shard_lock);
1803 if (thread_index < osd->num_shards) {
1804 return sdata->pqueue->empty() && sdata->context_queue.empty();
1805 } else {
1806 return sdata->pqueue->empty();
7c673cae 1807 }
11fdf7f2 1808 }
7c673cae 1809
11fdf7f2
TL
1810 void handle_oncommits(list<Context*>& oncommits) {
1811 for (auto p : oncommits) {
1812 p->complete(0);
1813 }
7c673cae
FG
1814 }
1815 } op_shardedwq;
1816
1817
11fdf7f2 1818 void enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch);
7c673cae
FG
1819 void dequeue_op(
1820 PGRef pg, OpRequestRef op,
1821 ThreadPool::TPHandle &handle);
1822
11fdf7f2
TL
1823 void enqueue_peering_evt(
1824 spg_t pgid,
1825 PGPeeringEventRef ref);
1826 void enqueue_peering_evt_front(
1827 spg_t pgid,
1828 PGPeeringEventRef ref);
1829 void dequeue_peering_evt(
1830 OSDShard *sdata,
1831 PG *pg,
1832 PGPeeringEventRef ref,
1833 ThreadPool::TPHandle& handle);
1834
1835 void dequeue_delete(
1836 OSDShard *sdata,
1837 PG *pg,
1838 epoch_t epoch,
1839 ThreadPool::TPHandle& handle);
7c673cae
FG
1840
1841 friend class PG;
11fdf7f2 1842 friend class OSDShard;
7c673cae
FG
1843 friend class PrimaryLogPG;
1844
1845
1846 protected:
1847
1848 // -- osd map --
1849 OSDMapRef osdmap;
1850 OSDMapRef get_osdmap() {
1851 return osdmap;
1852 }
224ce89b 1853 epoch_t get_osdmap_epoch() const {
7c673cae
FG
1854 return osdmap ? osdmap->get_epoch() : 0;
1855 }
1856
11fdf7f2
TL
1857 pool_pg_num_history_t pg_num_history;
1858
7c673cae
FG
1859 utime_t had_map_since;
1860 RWLock map_lock;
1861 list<OpRequestRef> waiting_for_osdmap;
1862 deque<utime_t> osd_markdown_log;
1863
1864 friend struct send_map_on_destruct;
1865
1866 void wait_for_new_map(OpRequestRef op);
1867 void handle_osd_map(class MOSDMap *m);
1868 void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
1869 void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
1870 void note_down_osd(int osd);
1871 void note_up_osd(int osd);
1872 friend class C_OnMapCommit;
1873
1874 bool advance_pg(
11fdf7f2
TL
1875 epoch_t advance_to,
1876 PG *pg,
7c673cae 1877 ThreadPool::TPHandle &handle,
11fdf7f2 1878 PG::RecoveryCtx *rctx);
7c673cae
FG
1879 void consume_map();
1880 void activate_map();
1881
1882 // osd map cache (past osd maps)
1883 OSDMapRef get_map(epoch_t e) {
1884 return service.get_map(e);
1885 }
1886 OSDMapRef add_map(OSDMap *o) {
1887 return service.add_map(o);
1888 }
1889 void add_map_bl(epoch_t e, bufferlist& bl) {
1890 return service.add_map_bl(e, bl);
1891 }
7c673cae
FG
1892 bool get_map_bl(epoch_t e, bufferlist& bl) {
1893 return service.get_map_bl(e, bl);
1894 }
1895 void add_map_inc_bl(epoch_t e, bufferlist& bl) {
1896 return service.add_map_inc_bl(e, bl);
1897 }
11fdf7f2
TL
1898
1899public:
1900 // -- shards --
1901 vector<OSDShard*> shards;
1902 uint32_t num_shards = 0;
1903
1904 void inc_num_pgs() {
1905 ++num_pgs;
1906 }
1907 void dec_num_pgs() {
1908 --num_pgs;
1909 }
1910 int get_num_pgs() const {
1911 return num_pgs;
7c673cae
FG
1912 }
1913
1914protected:
11fdf7f2
TL
1915 Mutex merge_lock = {"OSD::merge_lock"};
1916 /// merge epoch -> target pgid -> source pgid -> pg
1917 map<epoch_t,map<spg_t,map<spg_t,PGRef>>> merge_waiters;
1918
1919 bool add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef source,
1920 unsigned need);
1921
7c673cae 1922 // -- placement groups --
11fdf7f2 1923 std::atomic<size_t> num_pgs = {0};
7c673cae 1924
3efd9988 1925 std::mutex pending_creates_lock;
b32b8144
FG
1926 using create_from_osd_t = std::pair<pg_t, bool /* is primary*/>;
1927 std::set<create_from_osd_t> pending_creates_from_osd;
3efd9988
FG
1928 unsigned pending_creates_from_mon = 0;
1929
7c673cae
FG
1930 PGRecoveryStats pg_recovery_stats;
1931
11fdf7f2
TL
1932 PGRef _lookup_pg(spg_t pgid);
1933 PGRef _lookup_lock_pg(spg_t pgid);
1934 void register_pg(PGRef pg);
1935 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
7c673cae 1936
11fdf7f2
TL
1937 void _get_pgs(vector<PGRef> *v, bool clear_too=false);
1938 void _get_pgids(vector<spg_t> *v);
31f18b77
FG
1939
1940public:
11fdf7f2 1941 PGRef lookup_lock_pg(spg_t pgid);
31f18b77 1942
11fdf7f2 1943 std::set<int64_t> get_mapped_pools();
35e4c445 1944
31f18b77 1945protected:
7c673cae 1946 PG* _make_pg(OSDMapRef createmap, spg_t pgid);
7c673cae 1947
11fdf7f2
TL
1948 bool maybe_wait_for_max_pg(const OSDMapRef& osdmap,
1949 spg_t pgid, bool is_mon_create);
3efd9988
FG
1950 void resume_creating_pg();
1951
7c673cae 1952 void load_pgs();
7c673cae
FG
1953
1954 /// build initial pg history and intervals on create
1955 void build_initial_pg_history(
1956 spg_t pgid,
1957 epoch_t created,
1958 utime_t created_stamp,
1959 pg_history_t *h,
1960 PastIntervals *pi);
1961
7c673cae
FG
1962 epoch_t last_pg_create_epoch;
1963
1964 void handle_pg_create(OpRequestRef op);
1965
1966 void split_pgs(
1967 PG *parent,
31f18b77 1968 const set<spg_t> &childpgids, set<PGRef> *out_pgs,
7c673cae
FG
1969 OSDMapRef curmap,
1970 OSDMapRef nextmap,
1971 PG::RecoveryCtx *rctx);
11fdf7f2 1972 void _finish_splits(set<PGRef>& pgs);
7c673cae
FG
1973
1974 // == monitor interaction ==
1975 Mutex mon_report_lock;
1976 utime_t last_mon_report;
11fdf7f2 1977 Finisher boot_finisher;
7c673cae
FG
1978
1979 // -- boot --
1980 void start_boot();
1981 void _got_mon_epochs(epoch_t oldest, epoch_t newest);
1982 void _preboot(epoch_t oldest, epoch_t newest);
1983 void _send_boot();
1984 void _collect_metadata(map<string,string> *pmeta);
1985
1986 void start_waiting_for_healthy();
1987 bool _is_healthy();
1988
1989 void send_full_update();
1990
1991 friend struct C_OSD_GetVersion;
1992
1993 // -- alive --
1994 epoch_t up_thru_wanted;
1995
1996 void queue_want_up_thru(epoch_t want);
1997 void send_alive();
1998
1999 // -- full map requests --
2000 epoch_t requested_full_first, requested_full_last;
2001
2002 void request_full_map(epoch_t first, epoch_t last);
2003 void rerequest_full_maps() {
2004 epoch_t first = requested_full_first;
2005 epoch_t last = requested_full_last;
2006 requested_full_first = 0;
2007 requested_full_last = 0;
2008 request_full_map(first, last);
2009 }
2010 void got_full_map(epoch_t e);
2011
2012 // -- failures --
2013 map<int,utime_t> failure_queue;
11fdf7f2 2014 map<int,pair<utime_t,entity_addrvec_t> > failure_pending;
7c673cae
FG
2015
2016 void requeue_failures();
2017 void send_failures();
11fdf7f2
TL
2018 void send_still_alive(epoch_t epoch, int osd, const entity_addrvec_t &addrs);
2019 void cancel_pending_failures();
7c673cae
FG
2020
2021 ceph::coarse_mono_clock::time_point last_sent_beacon;
2022 Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
2023 epoch_t min_last_epoch_clean = 0;
2024 // which pgs were scanned for min_lec
2025 std::vector<pg_t> min_last_epoch_clean_pgs;
2026 void send_beacon(const ceph::coarse_mono_clock::time_point& now);
2027
7c673cae
FG
2028 ceph_tid_t get_tid() {
2029 return service.get_tid();
2030 }
2031
2032 // -- generic pg peering --
2033 PG::RecoveryCtx create_context();
2034 void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
2035 ThreadPool::TPHandle *handle = NULL);
2036 void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
2037 ThreadPool::TPHandle *handle = NULL);
11fdf7f2 2038 void discard_context(PG::RecoveryCtx &ctx);
7c673cae
FG
2039 void do_notifies(map<int,
2040 vector<pair<pg_notify_t, PastIntervals> > >&
2041 notify_list,
2042 OSDMapRef map);
2043 void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
2044 OSDMapRef map);
2045 void do_infos(map<int,
2046 vector<pair<pg_notify_t, PastIntervals> > >& info_map,
2047 OSDMapRef map);
2048
2049 bool require_mon_peer(const Message *m);
2050 bool require_mon_or_mgr_peer(const Message *m);
2051 bool require_osd_peer(const Message *m);
2052 /***
2053 * Verifies that we were alive in the given epoch, and that
2054 * still are.
2055 */
2056 bool require_self_aliveness(const Message *m, epoch_t alive_since);
2057 /**
2058 * Verifies that the OSD who sent the given op has the same
2059 * address as in the given map.
2060 * @pre op was sent by an OSD using the cluster messenger
2061 */
2062 bool require_same_peer_instance(const Message *m, OSDMapRef& map,
2063 bool is_fast_dispatch);
2064
2065 bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
2066 bool is_fast_dispatch);
2067
11fdf7f2
TL
2068 void handle_fast_pg_create(MOSDPGCreate2 *m);
2069 void handle_fast_pg_query(MOSDPGQuery *m);
2070 void handle_pg_query_nopg(const MQuery& q);
2071 void handle_fast_pg_notify(MOSDPGNotify *m);
2072 void handle_pg_notify_nopg(const MNotifyRec& q);
2073 void handle_fast_pg_info(MOSDPGInfo *m);
2074 void handle_fast_pg_remove(MOSDPGRemove *m);
7c673cae 2075
11fdf7f2
TL
2076public:
2077 // used by OSDShard
2078 PGRef handle_pg_create_info(const OSDMapRef& osdmap, const PGCreateInfo *info);
2079protected:
c07f9fc5 2080
11fdf7f2 2081 void handle_fast_force_recovery(MOSDForceRecovery *m);
7c673cae
FG
2082
2083 // -- commands --
2084 struct Command {
2085 vector<string> cmd;
2086 ceph_tid_t tid;
2087 bufferlist indata;
2088 ConnectionRef con;
2089
2090 Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co)
2091 : cmd(c), tid(t), indata(bl), con(co) {}
2092 };
2093 list<Command*> command_queue;
2094 struct CommandWQ : public ThreadPool::WorkQueue<Command> {
2095 OSD *osd;
2096 CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
2097 : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {}
2098
2099 bool _empty() override {
2100 return osd->command_queue.empty();
2101 }
2102 bool _enqueue(Command *c) override {
2103 osd->command_queue.push_back(c);
2104 return true;
2105 }
2106 void _dequeue(Command *pg) override {
2107 ceph_abort();
2108 }
2109 Command *_dequeue() override {
2110 if (osd->command_queue.empty())
2111 return NULL;
2112 Command *c = osd->command_queue.front();
2113 osd->command_queue.pop_front();
2114 return c;
2115 }
2116 void _process(Command *c, ThreadPool::TPHandle &) override {
11fdf7f2 2117 osd->osd_lock.lock();
7c673cae 2118 if (osd->is_stopping()) {
11fdf7f2 2119 osd->osd_lock.unlock();
7c673cae
FG
2120 delete c;
2121 return;
2122 }
2123 osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
11fdf7f2 2124 osd->osd_lock.unlock();
7c673cae
FG
2125 delete c;
2126 }
2127 void _clear() override {
2128 while (!osd->command_queue.empty()) {
2129 Command *c = osd->command_queue.front();
2130 osd->command_queue.pop_front();
2131 delete c;
2132 }
2133 }
2134 } command_wq;
2135
2136 void handle_command(class MMonCommand *m);
2137 void handle_command(class MCommand *m);
2138 void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
11fdf7f2
TL
2139 int _do_command(
2140 Connection *con, cmdmap_t& cmdmap, ceph_tid_t tid, bufferlist& data,
2141 bufferlist& odata, stringstream& ss, stringstream& ds);
2142
7c673cae
FG
2143
2144 // -- pg recovery --
2145 void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
2146 ThreadPool::TPHandle &handle);
2147
2148
2149 // -- scrubbing --
2150 void sched_scrub();
2151 bool scrub_random_backoff();
2152 bool scrub_load_below_threshold();
2153 bool scrub_time_permit(utime_t now);
2154
b32b8144
FG
2155 // -- status reporting --
2156 MPGStats *collect_pg_stats();
11fdf7f2
TL
2157 std::vector<DaemonHealthMetric> get_health_metrics();
2158
b32b8144 2159
224ce89b 2160private:
7c673cae
FG
2161 bool ms_can_fast_dispatch_any() const override { return true; }
2162 bool ms_can_fast_dispatch(const Message *m) const override {
2163 switch (m->get_type()) {
11fdf7f2 2164 case CEPH_MSG_PING:
7c673cae
FG
2165 case CEPH_MSG_OSD_OP:
2166 case CEPH_MSG_OSD_BACKOFF:
11fdf7f2
TL
2167 case MSG_OSD_SCRUB2:
2168 case MSG_OSD_FORCE_RECOVERY:
2169 case MSG_MON_COMMAND:
2170 case MSG_OSD_PG_CREATE2:
2171 case MSG_OSD_PG_QUERY:
2172 case MSG_OSD_PG_INFO:
2173 case MSG_OSD_PG_NOTIFY:
2174 case MSG_OSD_PG_LOG:
2175 case MSG_OSD_PG_TRIM:
2176 case MSG_OSD_PG_REMOVE:
2177 case MSG_OSD_BACKFILL_RESERVE:
2178 case MSG_OSD_RECOVERY_RESERVE:
7c673cae 2179 case MSG_OSD_REPOP:
7c673cae
FG
2180 case MSG_OSD_REPOPREPLY:
2181 case MSG_OSD_PG_PUSH:
2182 case MSG_OSD_PG_PULL:
2183 case MSG_OSD_PG_PUSH_REPLY:
2184 case MSG_OSD_PG_SCAN:
2185 case MSG_OSD_PG_BACKFILL:
2186 case MSG_OSD_PG_BACKFILL_REMOVE:
2187 case MSG_OSD_EC_WRITE:
2188 case MSG_OSD_EC_WRITE_REPLY:
2189 case MSG_OSD_EC_READ:
2190 case MSG_OSD_EC_READ_REPLY:
2191 case MSG_OSD_SCRUB_RESERVE:
2192 case MSG_OSD_REP_SCRUB:
2193 case MSG_OSD_REP_SCRUBMAP:
2194 case MSG_OSD_PG_UPDATE_LOG_MISSING:
2195 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
c07f9fc5
FG
2196 case MSG_OSD_PG_RECOVERY_DELETE:
2197 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
7c673cae
FG
2198 return true;
2199 default:
2200 return false;
2201 }
2202 }
2203 void ms_fast_dispatch(Message *m) override;
7c673cae 2204 bool ms_dispatch(Message *m) override;
11fdf7f2 2205 bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer) override;
7c673cae
FG
2206 void ms_handle_connect(Connection *con) override;
2207 void ms_handle_fast_connect(Connection *con) override;
2208 void ms_handle_fast_accept(Connection *con) override;
11fdf7f2
TL
2209 int ms_handle_authentication(Connection *con) override;
2210 KeyStore *ms_get_auth1_authorizer_keystore() override;
7c673cae
FG
2211 bool ms_handle_reset(Connection *con) override;
2212 void ms_handle_remote_reset(Connection *con) override {}
2213 bool ms_handle_refused(Connection *con) override;
2214
2215 io_queue get_io_queue() const {
2216 if (cct->_conf->osd_op_queue == "debug_random") {
224ce89b
WB
2217 static io_queue index_lookup[] = { io_queue::prioritized,
2218 io_queue::weightedpriority,
2219 io_queue::mclock_opclass,
2220 io_queue::mclock_client };
7c673cae 2221 srand(time(NULL));
224ce89b
WB
2222 unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
2223 return index_lookup[which];
d2e6a577
FG
2224 } else if (cct->_conf->osd_op_queue == "prioritized") {
2225 return io_queue::prioritized;
224ce89b
WB
2226 } else if (cct->_conf->osd_op_queue == "mclock_opclass") {
2227 return io_queue::mclock_opclass;
2228 } else if (cct->_conf->osd_op_queue == "mclock_client") {
2229 return io_queue::mclock_client;
7c673cae 2230 } else {
d2e6a577
FG
2231 // default / catch-all is 'wpq'
2232 return io_queue::weightedpriority;
7c673cae
FG
2233 }
2234 }
2235
2236 unsigned int get_io_prio_cut() const {
2237 if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
2238 srand(time(NULL));
2239 return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
d2e6a577 2240 } else if (cct->_conf->osd_op_queue_cut_off == "high") {
7c673cae 2241 return CEPH_MSG_PRIO_HIGH;
d2e6a577
FG
2242 } else {
2243 // default / catch-all is 'low'
2244 return CEPH_MSG_PRIO_LOW;
7c673cae
FG
2245 }
2246 }
2247
2248 public:
2249 /* internal and external can point to the same messenger, they will still
2250 * be cleaned up properly*/
2251 OSD(CephContext *cct_,
2252 ObjectStore *store_,
2253 int id,
2254 Messenger *internal,
2255 Messenger *external,
2256 Messenger *hb_front_client,
2257 Messenger *hb_back_client,
2258 Messenger *hb_front_server,
2259 Messenger *hb_back_server,
2260 Messenger *osdc_messenger,
2261 MonClient *mc, const std::string &dev, const std::string &jdev);
2262 ~OSD() override;
2263
2264 // static bits
11fdf7f2
TL
2265 static int mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami);
2266
7c673cae
FG
2267 /* remove any non-user xattrs from a map of them */
2268 void filter_xattrs(map<string, bufferptr>& attrs) {
2269 for (map<string, bufferptr>::iterator iter = attrs.begin();
2270 iter != attrs.end();
2271 ) {
2272 if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
2273 attrs.erase(iter++);
2274 else ++iter;
2275 }
2276 }
2277
2278private:
2279 int mon_cmd_maybe_osd_create(string &cmd);
2280 int update_crush_device_class();
2281 int update_crush_location();
2282
3efd9988
FG
2283 static int write_meta(CephContext *cct,
2284 ObjectStore *store,
7c673cae
FG
2285 uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami);
2286
7c673cae 2287 void handle_scrub(struct MOSDScrub *m);
11fdf7f2 2288 void handle_fast_scrub(struct MOSDScrub2 *m);
7c673cae
FG
2289 void handle_osd_ping(class MOSDPing *m);
2290
2291 int init_op_flags(OpRequestRef& op);
2292
31f18b77
FG
2293 int get_num_op_shards();
2294 int get_num_op_threads();
2295
c07f9fc5 2296 float get_osd_recovery_sleep();
11fdf7f2
TL
2297 float get_osd_delete_sleep();
2298
2299 void probe_smart(const string& devid, ostream& ss);
c07f9fc5 2300
7c673cae 2301public:
11fdf7f2
TL
2302 static int peek_meta(ObjectStore *store,
2303 string *magic,
2304 uuid_d *cluster_fsid,
2305 uuid_d *osd_fsid,
2306 int *whoami,
2307 int *min_osd_release);
7c673cae
FG
2308
2309
2310 // startup/shutdown
2311 int pre_init();
2312 int init();
2313 void final_init();
2314
2315 int enable_disable_fuse(bool stop);
11fdf7f2 2316 int set_numa_affinity();
7c673cae
FG
2317
2318 void suicide(int exitcode);
2319 int shutdown();
2320
2321 void handle_signal(int signum);
2322
2323 /// check if we can throw out op from a disconnected client
2324 static bool op_is_discardable(const MOSDOp *m);
2325
2326public:
2327 OSDService service;
2328 friend class OSDService;
11fdf7f2
TL
2329
2330private:
2331 void set_perf_queries(
2332 const std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> &queries);
2333 void get_perf_reports(
2334 std::map<OSDPerfMetricQuery, OSDPerfMetricReport> *reports);
2335
2336 Mutex m_perf_queries_lock = {"OSD::m_perf_queries_lock"};
2337 std::list<OSDPerfMetricQuery> m_perf_queries;
2338 std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> m_perf_limits;
7c673cae
FG
2339};
2340
224ce89b 2341
11fdf7f2 2342std::ostream& operator<<(std::ostream& out, const io_queue& q);
224ce89b
WB
2343
2344
7c673cae
FG
2345//compatibility of the executable
2346extern const CompatSet::Feature ceph_osd_feature_compat[];
2347extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
2348extern const CompatSet::Feature ceph_osd_feature_incompat[];
2349
224ce89b 2350#endif // CEPH_OSD_H