]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/OSD.h
update download target update for octopus release
[ceph.git] / ceph / src / osd / OSD.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_OSD_H
16#define CEPH_OSD_H
17
18#include "PG.h"
19
20#include "msg/Dispatcher.h"
21
22#include "common/Mutex.h"
23#include "common/RWLock.h"
24#include "common/Timer.h"
25#include "common/WorkQueue.h"
26#include "common/AsyncReserver.h"
27#include "common/ceph_context.h"
11fdf7f2 28#include "common/config_cacher.h"
7c673cae
FG
29#include "common/zipkin_trace.h"
30
31#include "mgr/MgrClient.h"
32
33#include "os/ObjectStore.h"
34#include "OSDCap.h"
35
36#include "auth/KeyRing.h"
11fdf7f2 37
7c673cae
FG
38#include "osd/ClassHandler.h"
39
40#include "include/CompatSet.h"
41
42#include "OpRequest.h"
43#include "Session.h"
44
11fdf7f2 45#include "osd/OpQueueItem.h"
224ce89b 46
7c673cae
FG
47#include <atomic>
48#include <map>
49#include <memory>
11fdf7f2 50#include <string>
7c673cae
FG
51
52#include "include/unordered_map.h"
53
54#include "common/shared_cache.hpp"
55#include "common/simple_cache.hpp"
56#include "common/sharedptr_registry.hpp"
57#include "common/WeightedPriorityQueue.h"
58#include "common/PrioritizedQueue.h"
224ce89b
WB
59#include "osd/mClockOpClassQueue.h"
60#include "osd/mClockClientQueue.h"
7c673cae 61#include "messages/MOSDOp.h"
7c673cae
FG
62#include "common/EventTrace.h"
63
64#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
65
11fdf7f2
TL
66/*
67
68 lock ordering for pg map
69
70 PG::lock
71 ShardData::lock
72 OSD::pg_map_lock
73
74 */
7c673cae
FG
75
76enum {
77 l_osd_first = 10000,
78 l_osd_op_wip,
79 l_osd_op,
80 l_osd_op_inb,
81 l_osd_op_outb,
82 l_osd_op_lat,
83 l_osd_op_process_lat,
84 l_osd_op_prepare_lat,
85 l_osd_op_r,
86 l_osd_op_r_outb,
87 l_osd_op_r_lat,
88 l_osd_op_r_lat_outb_hist,
89 l_osd_op_r_process_lat,
90 l_osd_op_r_prepare_lat,
91 l_osd_op_w,
92 l_osd_op_w_inb,
93 l_osd_op_w_lat,
94 l_osd_op_w_lat_inb_hist,
95 l_osd_op_w_process_lat,
96 l_osd_op_w_prepare_lat,
97 l_osd_op_rw,
98 l_osd_op_rw_inb,
99 l_osd_op_rw_outb,
100 l_osd_op_rw_lat,
101 l_osd_op_rw_lat_inb_hist,
102 l_osd_op_rw_lat_outb_hist,
103 l_osd_op_rw_process_lat,
104 l_osd_op_rw_prepare_lat,
105
224ce89b
WB
106 l_osd_op_before_queue_op_lat,
107 l_osd_op_before_dequeue_op_lat,
108
7c673cae
FG
109 l_osd_sop,
110 l_osd_sop_inb,
111 l_osd_sop_lat,
112 l_osd_sop_w,
113 l_osd_sop_w_inb,
114 l_osd_sop_w_lat,
115 l_osd_sop_pull,
116 l_osd_sop_pull_lat,
117 l_osd_sop_push,
118 l_osd_sop_push_inb,
119 l_osd_sop_push_lat,
120
121 l_osd_pull,
122 l_osd_push,
123 l_osd_push_outb,
124
125 l_osd_rop,
11fdf7f2 126 l_osd_rbytes,
7c673cae
FG
127
128 l_osd_loadavg,
7c673cae
FG
129 l_osd_cached_crc,
130 l_osd_cached_crc_adjusted,
131 l_osd_missed_crc,
132
133 l_osd_pg,
134 l_osd_pg_primary,
135 l_osd_pg_replica,
136 l_osd_pg_stray,
94b18763 137 l_osd_pg_removing,
7c673cae
FG
138 l_osd_hb_to,
139 l_osd_map,
140 l_osd_mape,
141 l_osd_mape_dup,
142
143 l_osd_waiting_for_map,
144
145 l_osd_map_cache_hit,
146 l_osd_map_cache_miss,
147 l_osd_map_cache_miss_low,
148 l_osd_map_cache_miss_low_avg,
31f18b77
FG
149 l_osd_map_bl_cache_hit,
150 l_osd_map_bl_cache_miss,
7c673cae
FG
151
152 l_osd_stat_bytes,
153 l_osd_stat_bytes_used,
154 l_osd_stat_bytes_avail,
155
156 l_osd_copyfrom,
157
158 l_osd_tier_promote,
159 l_osd_tier_flush,
160 l_osd_tier_flush_fail,
161 l_osd_tier_try_flush,
162 l_osd_tier_try_flush_fail,
163 l_osd_tier_evict,
164 l_osd_tier_whiteout,
165 l_osd_tier_dirty,
166 l_osd_tier_clean,
167 l_osd_tier_delay,
168 l_osd_tier_proxy_read,
169 l_osd_tier_proxy_write,
170
171 l_osd_agent_wake,
172 l_osd_agent_skip,
173 l_osd_agent_flush,
174 l_osd_agent_evict,
175
176 l_osd_object_ctx_cache_hit,
177 l_osd_object_ctx_cache_total,
178
179 l_osd_op_cache_hit,
180 l_osd_tier_flush_lat,
181 l_osd_tier_promote_lat,
182 l_osd_tier_r_lat,
183
184 l_osd_pg_info,
185 l_osd_pg_fastinfo,
186 l_osd_pg_biginfo,
187
188 l_osd_last,
189};
190
191// RecoveryState perf counters
192enum {
193 rs_first = 20000,
194 rs_initial_latency,
195 rs_started_latency,
196 rs_reset_latency,
197 rs_start_latency,
198 rs_primary_latency,
199 rs_peering_latency,
200 rs_backfilling_latency,
201 rs_waitremotebackfillreserved_latency,
202 rs_waitlocalbackfillreserved_latency,
203 rs_notbackfilling_latency,
204 rs_repnotrecovering_latency,
205 rs_repwaitrecoveryreserved_latency,
206 rs_repwaitbackfillreserved_latency,
207 rs_reprecovering_latency,
208 rs_activating_latency,
209 rs_waitlocalrecoveryreserved_latency,
210 rs_waitremoterecoveryreserved_latency,
211 rs_recovering_latency,
212 rs_recovered_latency,
213 rs_clean_latency,
214 rs_active_latency,
215 rs_replicaactive_latency,
216 rs_stray_latency,
217 rs_getinfo_latency,
218 rs_getlog_latency,
219 rs_waitactingchange_latency,
220 rs_incomplete_latency,
221 rs_down_latency,
222 rs_getmissing_latency,
223 rs_waitupthru_latency,
224 rs_notrecovering_latency,
225 rs_last,
226};
227
228class Messenger;
229class Message;
230class MonClient;
231class PerfCounters;
232class ObjectStore;
233class FuseStore;
234class OSDMap;
235class MLog;
236class Objecter;
11fdf7f2 237class KeyStore;
7c673cae
FG
238
239class Watch;
240class PrimaryLogPG;
241
7c673cae 242class TestOpsSocketHook;
11fdf7f2 243struct C_FinishSplits;
7c673cae
FG
244struct C_OpenPGs;
245class LogChannel;
246class CephContext;
7c673cae
FG
247class MOSDOp;
248
11fdf7f2
TL
249class MOSDPGCreate2;
250class MOSDPGQuery;
251class MOSDPGNotify;
252class MOSDPGInfo;
253class MOSDPGRemove;
254class MOSDForceRecovery;
7c673cae
FG
255
256class OSD;
257
7c673cae
FG
258class OSDService {
259public:
260 OSD *osd;
261 CephContext *cct;
11fdf7f2 262 ObjectStore::CollectionHandle meta_ch;
7c673cae
FG
263 const int whoami;
264 ObjectStore *&store;
265 LogClient &log_client;
266 LogChannelRef clog;
267 PGRecoveryStats &pg_recovery_stats;
268private:
269 Messenger *&cluster_messenger;
270 Messenger *&client_messenger;
271public:
272 PerfCounters *&logger;
273 PerfCounters *&recoverystate_perf;
274 MonClient *&monc;
7c673cae
FG
275 ClassHandler *&class_handler;
276
11fdf7f2
TL
277 md_config_cacher_t<Option::size_t> osd_max_object_size;
278 md_config_cacher_t<bool> osd_skip_data_digest;
279
280 void enqueue_back(OpQueueItem&& qi);
281 void enqueue_front(OpQueueItem&& qi);
7c673cae
FG
282
283 void maybe_inject_dispatch_delay() {
11fdf7f2 284 if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) {
7c673cae 285 if (rand() % 10000 <
11fdf7f2 286 g_conf()->osd_debug_inject_dispatch_delay_probability * 10000) {
7c673cae 287 utime_t t;
11fdf7f2 288 t.set_from_double(g_conf()->osd_debug_inject_dispatch_delay_duration);
7c673cae
FG
289 t.sleep();
290 }
291 }
292 }
293
7c673cae
FG
294private:
295 // -- superblock --
11fdf7f2 296 ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
7c673cae
FG
297 OSDSuperblock superblock;
298
299public:
300 OSDSuperblock get_superblock() {
11fdf7f2 301 std::lock_guard l(publish_lock);
7c673cae
FG
302 return superblock;
303 }
304 void publish_superblock(const OSDSuperblock &block) {
11fdf7f2 305 std::lock_guard l(publish_lock);
7c673cae
FG
306 superblock = block;
307 }
308
309 int get_nodeid() const { return whoami; }
310
311 std::atomic<epoch_t> max_oldest_map;
312private:
313 OSDMapRef osdmap;
314
315public:
316 OSDMapRef get_osdmap() {
11fdf7f2 317 std::lock_guard l(publish_lock);
7c673cae
FG
318 return osdmap;
319 }
320 epoch_t get_osdmap_epoch() {
11fdf7f2 321 std::lock_guard l(publish_lock);
7c673cae
FG
322 return osdmap ? osdmap->get_epoch() : 0;
323 }
324 void publish_map(OSDMapRef map) {
11fdf7f2 325 std::lock_guard l(publish_lock);
7c673cae
FG
326 osdmap = map;
327 }
328
329 /*
330 * osdmap - current published map
331 * next_osdmap - pre_published map that is about to be published.
332 *
333 * We use the next_osdmap to send messages and initiate connections,
334 * but only if the target is the same instance as the one in the map
335 * epoch the current user is working from (i.e., the result is
336 * equivalent to what is in next_osdmap).
337 *
338 * This allows the helpers to start ignoring osds that are about to
339 * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
340 * down, without worrying about reopening connections from threads
341 * working from old maps.
342 */
343private:
344 OSDMapRef next_osdmap;
11fdf7f2 345 ceph::condition_variable pre_publish_cond;
7c673cae
FG
346
347public:
348 void pre_publish_map(OSDMapRef map) {
11fdf7f2 349 std::lock_guard l(pre_publish_lock);
7c673cae
FG
350 next_osdmap = std::move(map);
351 }
352
353 void activate_map();
354 /// map epochs reserved below
355 map<epoch_t, unsigned> map_reservations;
356
357 /// gets ref to next_osdmap and registers the epoch as reserved
358 OSDMapRef get_nextmap_reserved() {
11fdf7f2 359 std::lock_guard l(pre_publish_lock);
7c673cae
FG
360 if (!next_osdmap)
361 return OSDMapRef();
362 epoch_t e = next_osdmap->get_epoch();
363 map<epoch_t, unsigned>::iterator i =
364 map_reservations.insert(make_pair(e, 0)).first;
365 i->second++;
366 return next_osdmap;
367 }
368 /// releases reservation on map
369 void release_map(OSDMapRef osdmap) {
11fdf7f2 370 std::lock_guard l(pre_publish_lock);
7c673cae
FG
371 map<epoch_t, unsigned>::iterator i =
372 map_reservations.find(osdmap->get_epoch());
11fdf7f2
TL
373 ceph_assert(i != map_reservations.end());
374 ceph_assert(i->second > 0);
7c673cae
FG
375 if (--(i->second) == 0) {
376 map_reservations.erase(i);
377 }
11fdf7f2 378 pre_publish_cond.notify_all();
7c673cae
FG
379 }
380 /// blocks until there are no reserved maps prior to next_osdmap
381 void await_reserved_maps() {
11fdf7f2
TL
382 std::unique_lock l{pre_publish_lock};
383 ceph_assert(next_osdmap);
384 pre_publish_cond.wait(l, [this] {
385 auto i = map_reservations.cbegin();
386 return (i == map_reservations.cend() ||
387 i->first >= next_osdmap->get_epoch());
388 });
389 }
390 OSDMapRef get_next_osdmap() {
391 std::lock_guard l(pre_publish_lock);
392 if (!next_osdmap)
393 return OSDMapRef();
394 return next_osdmap;
7c673cae
FG
395 }
396
397private:
398 Mutex peer_map_epoch_lock;
399 map<int, epoch_t> peer_map_epoch;
400public:
401 epoch_t get_peer_epoch(int p);
402 epoch_t note_peer_epoch(int p, epoch_t e);
403 void forget_peer_epoch(int p, epoch_t e);
404
405 void send_map(class MOSDMap *m, Connection *con);
406 void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
407 MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
408 OSDSuperblock& superblock);
409 bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
410 const OSDMapRef& osdmap, const epoch_t *sent_epoch_p);
411 void share_map(entity_name_t name, Connection *con, epoch_t epoch,
412 OSDMapRef& osdmap, epoch_t *sent_epoch_p);
413 void share_map_peer(int peer, Connection *con,
414 OSDMapRef map = OSDMapRef());
415
416 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
417 pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
418 void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
419 void send_message_osd_cluster(Message *m, Connection *con) {
420 con->send_message(m);
421 }
422 void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
423 con->send_message(m);
424 }
425 void send_message_osd_client(Message *m, Connection *con) {
426 con->send_message(m);
427 }
428 void send_message_osd_client(Message *m, const ConnectionRef& con) {
429 con->send_message(m);
430 }
11fdf7f2 431 entity_name_t get_cluster_msgr_name() const;
7c673cae
FG
432
433private:
434 // -- scrub scheduling --
435 Mutex sched_scrub_lock;
eafe8130
TL
436 int scrubs_local;
437 int scrubs_remote;
7c673cae
FG
438
439public:
440 struct ScrubJob {
441 CephContext* cct;
442 /// pg to be scrubbed
443 spg_t pgid;
444 /// a time scheduled for scrub. but the scrub could be delayed if system
445 /// load is too high or it fails to fall in the scrub hours
446 utime_t sched_time;
447 /// the hard upper bound of scrub time
448 utime_t deadline;
449 ScrubJob() : cct(nullptr) {}
450 explicit ScrubJob(CephContext* cct, const spg_t& pg,
451 const utime_t& timestamp,
452 double pool_scrub_min_interval = 0,
453 double pool_scrub_max_interval = 0, bool must = true);
454 /// order the jobs by sched_time
455 bool operator<(const ScrubJob& rhs) const;
456 };
457 set<ScrubJob> sched_scrub_pg;
458
459 /// @returns the scrub_reg_stamp used for unregister the scrub job
460 utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval,
461 double pool_scrub_max_interval, bool must) {
462 ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
463 must);
11fdf7f2 464 std::lock_guard l(sched_scrub_lock);
7c673cae
FG
465 sched_scrub_pg.insert(scrub);
466 return scrub.sched_time;
467 }
468 void unreg_pg_scrub(spg_t pgid, utime_t t) {
11fdf7f2 469 std::lock_guard l(sched_scrub_lock);
7c673cae 470 size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
11fdf7f2 471 ceph_assert(removed);
7c673cae
FG
472 }
473 bool first_scrub_stamp(ScrubJob *out) {
11fdf7f2 474 std::lock_guard l(sched_scrub_lock);
7c673cae
FG
475 if (sched_scrub_pg.empty())
476 return false;
477 set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
478 *out = *iter;
479 return true;
480 }
481 bool next_scrub_stamp(const ScrubJob& next,
482 ScrubJob *out) {
11fdf7f2 483 std::lock_guard l(sched_scrub_lock);
7c673cae
FG
484 if (sched_scrub_pg.empty())
485 return false;
486 set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next);
487 if (iter == sched_scrub_pg.cend())
488 return false;
489 ++iter;
490 if (iter == sched_scrub_pg.cend())
491 return false;
492 *out = *iter;
493 return true;
494 }
495
496 void dumps_scrub(Formatter *f) {
11fdf7f2
TL
497 ceph_assert(f != nullptr);
498 std::lock_guard l(sched_scrub_lock);
7c673cae
FG
499
500 f->open_array_section("scrubs");
501 for (const auto &i: sched_scrub_pg) {
502 f->open_object_section("scrub");
503 f->dump_stream("pgid") << i.pgid;
504 f->dump_stream("sched_time") << i.sched_time;
505 f->dump_stream("deadline") << i.deadline;
494da23a 506 f->dump_bool("forced", i.sched_time == PG::Scrubber::scrub_must_stamp());
7c673cae
FG
507 f->close_section();
508 }
509 f->close_section();
510 }
511
eafe8130
TL
512 bool can_inc_scrubs();
513 bool inc_scrubs_local();
514 void dec_scrubs_local();
515 bool inc_scrubs_remote();
516 void dec_scrubs_remote();
517 void dump_scrub_reservations(Formatter *f);
7c673cae
FG
518
519 void reply_op_error(OpRequestRef op, int err);
520 void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
521 void handle_misdirected_op(PG *pg, OpRequestRef op);
522
523
524private:
525 // -- agent shared state --
526 Mutex agent_lock;
527 Cond agent_cond;
528 map<uint64_t, set<PGRef> > agent_queue;
529 set<PGRef>::iterator agent_queue_pos;
530 bool agent_valid_iterator;
531 int agent_ops;
532 int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
533 set<hobject_t> agent_oids;
534 bool agent_active;
535 struct AgentThread : public Thread {
536 OSDService *osd;
537 explicit AgentThread(OSDService *o) : osd(o) {}
538 void *entry() override {
539 osd->agent_entry();
540 return NULL;
541 }
542 } agent_thread;
543 bool agent_stop_flag;
544 Mutex agent_timer_lock;
545 SafeTimer agent_timer;
546
547public:
548 void agent_entry();
549 void agent_stop();
550
551 void _enqueue(PG *pg, uint64_t priority) {
552 if (!agent_queue.empty() &&
553 agent_queue.rbegin()->first < priority)
554 agent_valid_iterator = false; // inserting higher-priority queue
555 set<PGRef>& nq = agent_queue[priority];
556 if (nq.empty())
557 agent_cond.Signal();
558 nq.insert(pg);
559 }
560
561 void _dequeue(PG *pg, uint64_t old_priority) {
562 set<PGRef>& oq = agent_queue[old_priority];
563 set<PGRef>::iterator p = oq.find(pg);
11fdf7f2 564 ceph_assert(p != oq.end());
7c673cae
FG
565 if (p == agent_queue_pos)
566 ++agent_queue_pos;
567 oq.erase(p);
568 if (oq.empty()) {
569 if (agent_queue.rbegin()->first == old_priority)
570 agent_valid_iterator = false;
571 agent_queue.erase(old_priority);
572 }
573 }
574
575 /// enable agent for a pg
576 void agent_enable_pg(PG *pg, uint64_t priority) {
11fdf7f2 577 std::lock_guard l(agent_lock);
7c673cae
FG
578 _enqueue(pg, priority);
579 }
580
581 /// adjust priority for an enagled pg
582 void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
11fdf7f2
TL
583 std::lock_guard l(agent_lock);
584 ceph_assert(new_priority != old_priority);
7c673cae
FG
585 _enqueue(pg, new_priority);
586 _dequeue(pg, old_priority);
587 }
588
589 /// disable agent for a pg
590 void agent_disable_pg(PG *pg, uint64_t old_priority) {
11fdf7f2 591 std::lock_guard l(agent_lock);
7c673cae
FG
592 _dequeue(pg, old_priority);
593 }
594
595 /// note start of an async (evict) op
596 void agent_start_evict_op() {
11fdf7f2 597 std::lock_guard l(agent_lock);
7c673cae
FG
598 ++agent_ops;
599 }
600
601 /// note finish or cancellation of an async (evict) op
602 void agent_finish_evict_op() {
11fdf7f2
TL
603 std::lock_guard l(agent_lock);
604 ceph_assert(agent_ops > 0);
7c673cae
FG
605 --agent_ops;
606 agent_cond.Signal();
607 }
608
609 /// note start of an async (flush) op
610 void agent_start_op(const hobject_t& oid) {
11fdf7f2 611 std::lock_guard l(agent_lock);
7c673cae 612 ++agent_ops;
11fdf7f2 613 ceph_assert(agent_oids.count(oid) == 0);
7c673cae
FG
614 agent_oids.insert(oid);
615 }
616
617 /// note finish or cancellation of an async (flush) op
618 void agent_finish_op(const hobject_t& oid) {
11fdf7f2
TL
619 std::lock_guard l(agent_lock);
620 ceph_assert(agent_ops > 0);
7c673cae 621 --agent_ops;
11fdf7f2 622 ceph_assert(agent_oids.count(oid) == 1);
7c673cae
FG
623 agent_oids.erase(oid);
624 agent_cond.Signal();
625 }
626
627 /// check if we are operating on an object
628 bool agent_is_active_oid(const hobject_t& oid) {
11fdf7f2 629 std::lock_guard l(agent_lock);
7c673cae
FG
630 return agent_oids.count(oid);
631 }
632
633 /// get count of active agent ops
634 int agent_get_num_ops() {
11fdf7f2 635 std::lock_guard l(agent_lock);
7c673cae
FG
636 return agent_ops;
637 }
638
639 void agent_inc_high_count() {
11fdf7f2 640 std::lock_guard l(agent_lock);
7c673cae
FG
641 flush_mode_high_count ++;
642 }
643
644 void agent_dec_high_count() {
11fdf7f2 645 std::lock_guard l(agent_lock);
7c673cae
FG
646 flush_mode_high_count --;
647 }
648
649private:
650 /// throttle promotion attempts
11fdf7f2 651 std::atomic<unsigned int> promote_probability_millis{1000}; ///< probability thousands. one word.
7c673cae
FG
652 PromoteCounter promote_counter;
653 utime_t last_recalibrate;
654 unsigned long promote_max_objects, promote_max_bytes;
655
656public:
657 bool promote_throttle() {
658 // NOTE: lockless! we rely on the probability being a single word.
659 promote_counter.attempt();
660 if ((unsigned)rand() % 1000 > promote_probability_millis)
661 return true; // yes throttle (no promote)
662 if (promote_max_objects &&
663 promote_counter.objects > promote_max_objects)
664 return true; // yes throttle
665 if (promote_max_bytes &&
666 promote_counter.bytes > promote_max_bytes)
667 return true; // yes throttle
668 return false; // no throttle (promote)
669 }
670 void promote_finish(uint64_t bytes) {
671 promote_counter.finish(bytes);
672 }
673 void promote_throttle_recalibrate();
674
675 // -- Objecter, for tiering reads/writes from/to other OSDs --
676 Objecter *objecter;
11fdf7f2
TL
677 int m_objecter_finishers;
678 vector<Finisher*> objecter_finishers;
7c673cae
FG
679
680 // -- Watch --
681 Mutex watch_lock;
682 SafeTimer watch_timer;
683 uint64_t next_notif_id;
684 uint64_t get_next_id(epoch_t cur_epoch) {
11fdf7f2 685 std::lock_guard l(watch_lock);
7c673cae
FG
686 return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
687 }
688
689 // -- Recovery/Backfill Request Scheduling --
690 Mutex recovery_request_lock;
691 SafeTimer recovery_request_timer;
692
31f18b77
FG
693 // For async recovery sleep
694 bool recovery_needs_sleep = true;
695 utime_t recovery_schedule_time = utime_t();
696
11fdf7f2
TL
697 // For recovery & scrub & snap
698 Mutex sleep_lock;
699 SafeTimer sleep_timer;
31f18b77 700
7c673cae
FG
701 // -- tids --
702 // for ops i issue
11fdf7f2 703 std::atomic<unsigned int> last_tid{0};
7c673cae
FG
704 ceph_tid_t get_tid() {
705 return (ceph_tid_t)last_tid++;
706 }
707
708 // -- backfill_reservation --
709 Finisher reserver_finisher;
710 AsyncReserver<spg_t> local_reserver;
711 AsyncReserver<spg_t> remote_reserver;
712
11fdf7f2
TL
713 // -- pg merge --
714 Mutex merge_lock = {"OSD::merge_lock"};
715 map<pg_t,eversion_t> ready_to_merge_source; // pg -> version
716 map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target; // pg -> (version,les,lec)
717 set<pg_t> not_ready_to_merge_source;
718 map<pg_t,pg_t> not_ready_to_merge_target;
719 set<pg_t> sent_ready_to_merge_source;
720
721 void set_ready_to_merge_source(PG *pg,
722 eversion_t version);
723 void set_ready_to_merge_target(PG *pg,
724 eversion_t version,
725 epoch_t last_epoch_started,
726 epoch_t last_epoch_clean);
727 void set_not_ready_to_merge_source(pg_t source);
728 void set_not_ready_to_merge_target(pg_t target, pg_t source);
729 void clear_ready_to_merge(PG *pg);
730 void send_ready_to_merge();
731 void _send_ready_to_merge();
732 void clear_sent_ready_to_merge();
733 void prune_sent_ready_to_merge(OSDMapRef& osdmap);
734
7c673cae
FG
735 // -- pg_temp --
736private:
737 Mutex pg_temp_lock;
94b18763 738 struct pg_temp_t {
94b18763
FG
739 vector<int> acting;
740 bool forced = false;
741 };
742 map<pg_t, pg_temp_t> pg_temp_wanted;
743 map<pg_t, pg_temp_t> pg_temp_pending;
7c673cae 744 void _sent_pg_temp();
94b18763 745 friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
7c673cae 746public:
94b18763
FG
747 void queue_want_pg_temp(pg_t pgid, const vector<int>& want,
748 bool forced = false);
7c673cae
FG
749 void remove_want_pg_temp(pg_t pgid);
750 void requeue_pg_temp();
751 void send_pg_temp();
752
11fdf7f2
TL
753 ceph::mutex pg_created_lock = ceph::make_mutex("OSDService::pg_created_lock");
754 set<pg_t> pg_created;
7c673cae 755 void send_pg_created(pg_t pgid);
11fdf7f2
TL
756 void prune_pg_created();
757 void send_pg_created();
31f18b77 758
7c673cae 759 AsyncReserver<spg_t> snap_reserver;
11fdf7f2 760 void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c);
7c673cae 761 void queue_for_snap_trim(PG *pg);
11fdf7f2
TL
762 void queue_for_scrub(PG *pg, bool with_high_priority);
763 void queue_for_pg_delete(spg_t pgid, epoch_t e);
764 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
7c673cae
FG
765
766private:
767 // -- pg recovery and associated throttling --
768 Mutex recovery_lock;
769 list<pair<epoch_t, PGRef> > awaiting_throttle;
770
771 utime_t defer_recovery_until;
772 uint64_t recovery_ops_active;
773 uint64_t recovery_ops_reserved;
774 bool recovery_paused;
775#ifdef DEBUG_RECOVERY_OIDS
776 map<spg_t, set<hobject_t> > recovery_oids;
777#endif
778 bool _recover_now(uint64_t *available_pushes);
779 void _maybe_queue_recovery();
780 void _queue_for_recovery(
11fdf7f2 781 pair<epoch_t, PGRef> p, uint64_t reserved_pushes);
7c673cae
FG
782public:
783 void start_recovery_op(PG *pg, const hobject_t& soid);
784 void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
785 bool is_recovery_active();
11fdf7f2 786 void release_reserved_pushes(uint64_t pushes);
7c673cae
FG
787 void defer_recovery(float defer_for) {
788 defer_recovery_until = ceph_clock_now();
789 defer_recovery_until += defer_for;
790 }
791 void pause_recovery() {
11fdf7f2 792 std::lock_guard l(recovery_lock);
7c673cae
FG
793 recovery_paused = true;
794 }
795 bool recovery_is_paused() {
11fdf7f2 796 std::lock_guard l(recovery_lock);
7c673cae
FG
797 return recovery_paused;
798 }
799 void unpause_recovery() {
11fdf7f2 800 std::lock_guard l(recovery_lock);
7c673cae
FG
801 recovery_paused = false;
802 _maybe_queue_recovery();
803 }
804 void kick_recovery_queue() {
11fdf7f2 805 std::lock_guard l(recovery_lock);
7c673cae
FG
806 _maybe_queue_recovery();
807 }
808 void clear_queued_recovery(PG *pg) {
11fdf7f2
TL
809 std::lock_guard l(recovery_lock);
810 awaiting_throttle.remove_if(
811 [pg](decltype(awaiting_throttle)::const_reference awaiting ) {
812 return awaiting.second.get() == pg;
813 });
7c673cae
FG
814 }
815 // delayed pg activation
c07f9fc5 816 void queue_for_recovery(PG *pg) {
11fdf7f2 817 std::lock_guard l(recovery_lock);
c07f9fc5 818
11fdf7f2 819 if (pg->is_forced_recovery_or_backfill()) {
7c673cae
FG
820 awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
821 } else {
822 awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
823 }
824 _maybe_queue_recovery();
825 }
31f18b77 826 void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
11fdf7f2 827 std::lock_guard l(recovery_lock);
31f18b77
FG
828 _queue_for_recovery(make_pair(queued, pg), reserved_pushes);
829 }
7c673cae 830
7c673cae
FG
831 // osd map cache (past osd maps)
832 Mutex map_cache_lock;
833 SharedLRU<epoch_t, const OSDMap> map_cache;
834 SimpleLRU<epoch_t, bufferlist> map_bl_cache;
835 SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
836
11fdf7f2
TL
837 /// final pg_num values for recently deleted pools
838 map<int64_t,int> deleted_pool_pg_nums;
839
7c673cae
FG
840 OSDMapRef try_get_map(epoch_t e);
841 OSDMapRef get_map(epoch_t e) {
842 OSDMapRef ret(try_get_map(e));
11fdf7f2 843 ceph_assert(ret);
7c673cae
FG
844 return ret;
845 }
846 OSDMapRef add_map(OSDMap *o) {
11fdf7f2 847 std::lock_guard l(map_cache_lock);
7c673cae
FG
848 return _add_map(o);
849 }
850 OSDMapRef _add_map(OSDMap *o);
851
852 void add_map_bl(epoch_t e, bufferlist& bl) {
11fdf7f2 853 std::lock_guard l(map_cache_lock);
7c673cae
FG
854 return _add_map_bl(e, bl);
855 }
7c673cae
FG
856 void _add_map_bl(epoch_t e, bufferlist& bl);
857 bool get_map_bl(epoch_t e, bufferlist& bl) {
11fdf7f2 858 std::lock_guard l(map_cache_lock);
7c673cae
FG
859 return _get_map_bl(e, bl);
860 }
861 bool _get_map_bl(epoch_t e, bufferlist& bl);
862
863 void add_map_inc_bl(epoch_t e, bufferlist& bl) {
11fdf7f2 864 std::lock_guard l(map_cache_lock);
7c673cae
FG
865 return _add_map_inc_bl(e, bl);
866 }
7c673cae
FG
867 void _add_map_inc_bl(epoch_t e, bufferlist& bl);
868 bool get_inc_map_bl(epoch_t e, bufferlist& bl);
869
11fdf7f2
TL
870 /// get last pg_num before a pool was deleted (if any)
871 int get_deleted_pool_pg_num(int64_t pool);
7c673cae 872
11fdf7f2
TL
873 void store_deleted_pool_pg_num(int64_t pool, int pg_num) {
874 std::lock_guard l(map_cache_lock);
875 deleted_pool_pg_nums[pool] = pg_num;
876 }
7c673cae 877
11fdf7f2
TL
878 /// get pgnum from newmap or, if pool was deleted, last map pool existed in
879 int get_possibly_deleted_pool_pg_num(OSDMapRef newmap,
880 int64_t pool) {
881 if (newmap->have_pg_pool(pool)) {
882 return newmap->get_pg_num(pool);
883 }
884 return get_deleted_pool_pg_num(pool);
885 }
886
887 /// identify split child pgids over a osdmap interval
888 void identify_splits_and_merges(
889 OSDMapRef old_map,
890 OSDMapRef new_map,
891 spg_t pgid,
892 set<pair<spg_t,epoch_t>> *new_children,
893 set<pair<spg_t,epoch_t>> *merge_pgs);
894
895 void need_heartbeat_peer_update();
7c673cae
FG
896
897 void init();
898 void final_init();
899 void start_shutdown();
31f18b77 900 void shutdown_reserver();
7c673cae
FG
901 void shutdown();
902
7c673cae
FG
903 // -- stats --
904 Mutex stat_lock;
905 osd_stat_t osd_stat;
31f18b77 906 uint32_t seq = 0;
7c673cae 907
11fdf7f2
TL
908 void set_statfs(const struct store_statfs_t &stbuf,
909 osd_alert_list_t& alerts);
910 osd_stat_t set_osd_stat(vector<int>& hb_peers, int num_pgs);
911 void inc_osd_stat_repaired(void);
912 float compute_adjusted_ratio(osd_stat_t new_stat, float *pratio, uint64_t adjust_used = 0);
7c673cae 913 osd_stat_t get_osd_stat() {
11fdf7f2 914 std::lock_guard l(stat_lock);
31f18b77
FG
915 ++seq;
916 osd_stat.up_from = up_epoch;
917 osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
7c673cae
FG
918 return osd_stat;
919 }
31f18b77 920 uint64_t get_osd_stat_seq() {
11fdf7f2 921 std::lock_guard l(stat_lock);
31f18b77
FG
922 return osd_stat.seq;
923 }
eafe8130
TL
924 void get_hb_pingtime(map<int, osd_stat_t::Interfaces> *pp)
925 {
926 std::lock_guard l(stat_lock);
927 *pp = osd_stat.hb_pingtime;
928 return;
929 }
7c673cae
FG
930
931 // -- OSD Full Status --
932private:
933 friend TestOpsSocketHook;
934 mutable Mutex full_status_lock;
935 enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
936 const char *get_full_state_name(s_names s) const {
937 switch (s) {
938 case NONE: return "none";
939 case NEARFULL: return "nearfull";
940 case BACKFILLFULL: return "backfillfull";
941 case FULL: return "full";
942 case FAILSAFE: return "failsafe";
943 default: return "???";
944 }
945 }
946 s_names get_full_state(string type) const {
947 if (type == "none")
948 return NONE;
949 else if (type == "failsafe")
950 return FAILSAFE;
951 else if (type == "full")
952 return FULL;
953 else if (type == "backfillfull")
954 return BACKFILLFULL;
955 else if (type == "nearfull")
956 return NEARFULL;
957 else
958 return INVALID;
959 }
11fdf7f2 960 double cur_ratio, physical_ratio; ///< current utilization
7c673cae
FG
961 mutable int64_t injectfull = 0;
962 s_names injectfull_state = NONE;
963 float get_failsafe_full_ratio();
11fdf7f2
TL
964 bool _check_inject_full(DoutPrefixProvider *dpp, s_names type) const;
965 bool _check_full(DoutPrefixProvider *dpp, s_names type) const;
7c673cae 966public:
11fdf7f2
TL
967 void check_full_status(float ratio, float pratio);
968 s_names recalc_full_state(float ratio, float pratio, string &inject);
969 bool _tentative_full(DoutPrefixProvider *dpp, s_names type, uint64_t adjust_used, osd_stat_t);
970 bool check_failsafe_full(DoutPrefixProvider *dpp) const;
971 bool check_full(DoutPrefixProvider *dpp) const;
972 bool tentative_backfill_full(DoutPrefixProvider *dpp, uint64_t adjust_used, osd_stat_t);
973 bool check_backfill_full(DoutPrefixProvider *dpp) const;
974 bool check_nearfull(DoutPrefixProvider *dpp) const;
7c673cae
FG
975 bool is_failsafe_full() const;
976 bool is_full() const;
977 bool is_backfillfull() const;
978 bool is_nearfull() const;
979 bool need_fullness_update(); ///< osdmap state needs update
980 void set_injectfull(s_names type, int64_t count);
981 bool check_osdmap_full(const set<pg_shard_t> &missing_on);
982
983
984 // -- epochs --
985private:
986 mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch
987 epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
988 epoch_t up_epoch; // _most_recent_ epoch we were marked up
989 epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
990public:
991 /**
992 * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params
993 * can be NULL if you don't care about them.
994 */
995 void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
996 epoch_t *_bind_epoch) const;
997 /**
998 * Set the boot, up, and bind epochs. Any NULL params will not be set.
999 */
1000 void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
1001 const epoch_t *_bind_epoch);
1002 epoch_t get_boot_epoch() const {
1003 epoch_t ret;
1004 retrieve_epochs(&ret, NULL, NULL);
1005 return ret;
1006 }
1007 epoch_t get_up_epoch() const {
1008 epoch_t ret;
1009 retrieve_epochs(NULL, &ret, NULL);
1010 return ret;
1011 }
1012 epoch_t get_bind_epoch() const {
1013 epoch_t ret;
1014 retrieve_epochs(NULL, NULL, &ret);
1015 return ret;
1016 }
1017
181888fb
FG
1018 void request_osdmap_update(epoch_t e);
1019
7c673cae
FG
1020 // -- stopping --
1021 Mutex is_stopping_lock;
1022 Cond is_stopping_cond;
1023 enum {
1024 NOT_STOPPING,
1025 PREPARING_TO_STOP,
1026 STOPPING };
11fdf7f2
TL
1027 std::atomic<int> state{NOT_STOPPING};
1028 int get_state() const {
7c673cae
FG
1029 return state;
1030 }
1031 void set_state(int s) {
1032 state = s;
1033 }
1034 bool is_stopping() const {
1035 return state == STOPPING;
1036 }
1037 bool is_preparing_to_stop() const {
1038 return state == PREPARING_TO_STOP;
1039 }
1040 bool prepare_to_stop();
1041 void got_stop_ack();
1042
1043
1044#ifdef PG_DEBUG_REFS
1045 Mutex pgid_lock;
1046 map<spg_t, int> pgid_tracker;
1047 map<spg_t, PG*> live_pgs;
31f18b77
FG
1048 void add_pgid(spg_t pgid, PG *pg);
1049 void remove_pgid(spg_t pgid, PG *pg);
1050 void dump_live_pgids();
7c673cae
FG
1051#endif
1052
1053 explicit OSDService(OSD *osd);
1054 ~OSDService();
1055};
1056
11fdf7f2
TL
1057
1058enum class io_queue {
1059 prioritized,
1060 weightedpriority,
1061 mclock_opclass,
1062 mclock_client,
1063};
1064
1065
1066/*
1067
1068 Each PG slot includes queues for events that are processing and/or waiting
1069 for a PG to be materialized in the slot.
1070
1071 These are the constraints:
1072
1073 - client ops must remained ordered by client, regardless of map epoch
1074 - peering messages/events from peers must remain ordered by peer
1075 - peering messages and client ops need not be ordered relative to each other
1076
1077 - some peering events can create a pg (e.g., notify)
1078 - the query peering event can proceed when a PG doesn't exist
1079
1080 Implementation notes:
1081
1082 - everybody waits for split. If the OSD has the parent PG it will instantiate
1083 the PGSlot early and mark it waiting_for_split. Everything will wait until
1084 the parent is able to commit the split operation and the child PG's are
1085 materialized in the child slots.
1086
1087 - every event has an epoch property and will wait for the OSDShard to catch
1088 up to that epoch. For example, if we get a peering event from a future
1089 epoch, the event will wait in the slot until the local OSD has caught up.
1090 (We should be judicious in specifying the required epoch [by, e.g., setting
1091 it to the same_interval_since epoch] so that we don't wait for epochs that
1092 don't affect the given PG.)
1093
1094 - we maintain two separate wait lists, *waiting* and *waiting_peering*. The
1095 OpQueueItem has an is_peering() bool to determine which we use. Waiting
1096 peering events are queued up by epoch required.
1097
1098 - when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or
1099 materialized the PG), we wake *all* waiting items. (This could be optimized,
1100 probably, but we don't bother.) We always requeue peering items ahead of
1101 client ops.
1102
1103 - some peering events are marked !peering_requires_pg (PGQuery). if we do
1104 not have a PG these are processed immediately (under the shard lock).
1105
1106 - we do not have a PG present, we check if the slot maps to the current host.
1107 if so, we either queue the item and wait for the PG to materialize, or
1108 (if the event is a pg creating event like PGNotify), we materialize the PG.
1109
1110 - when we advance the osdmap on the OSDShard, we scan pg slots and
1111 discard any slots with no pg (and not waiting_for_split) that no
1112 longer map to the current host.
1113
1114 */
1115
1116struct OSDShardPGSlot {
1117 PGRef pg; ///< pg reference
1118 deque<OpQueueItem> to_process; ///< order items for this slot
1119 int num_running = 0; ///< _process threads doing pg lookup/lock
1120
1121 deque<OpQueueItem> waiting; ///< waiting for pg (or map + pg)
1122
1123 /// waiting for map (peering evt)
1124 map<epoch_t,deque<OpQueueItem>> waiting_peering;
1125
1126 /// incremented by wake_pg_waiters; indicates racing _process threads
1127 /// should bail out (their op has been requeued)
1128 uint64_t requeue_seq = 0;
1129
1130 /// waiting for split child to materialize in these epoch(s)
1131 set<epoch_t> waiting_for_split;
1132
1133 epoch_t epoch = 0;
1134 boost::intrusive::set_member_hook<> pg_epoch_item;
1135
1136 /// waiting for a merge (source or target) by this epoch
1137 epoch_t waiting_for_merge_epoch = 0;
1138};
1139
1140struct OSDShard {
1141 const unsigned shard_id;
1142 CephContext *cct;
1143 OSD *osd;
1144
1145 string shard_name;
1146
1147 string sdata_wait_lock_name;
1148 ceph::mutex sdata_wait_lock;
1149 ceph::condition_variable sdata_cond;
1150
1151 string osdmap_lock_name;
1152 ceph::mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock
1153 OSDMapRef shard_osdmap;
1154
1155 OSDMapRef get_osdmap() {
1156 std::lock_guard l(osdmap_lock);
1157 return shard_osdmap;
1158 }
1159
1160 string shard_lock_name;
1161 ceph::mutex shard_lock; ///< protects remaining members below
1162
1163 /// map of slots for each spg_t. maintains ordering of items dequeued
1164 /// from pqueue while _process thread drops shard lock to acquire the
1165 /// pg lock. stale slots are removed by consume_map.
1166 unordered_map<spg_t,unique_ptr<OSDShardPGSlot>> pg_slots;
1167
1168 struct pg_slot_compare_by_epoch {
1169 bool operator()(const OSDShardPGSlot& l, const OSDShardPGSlot& r) const {
1170 return l.epoch < r.epoch;
1171 }
1172 };
1173
1174 /// maintain an ordering of pg slots by pg epoch
1175 boost::intrusive::multiset<
1176 OSDShardPGSlot,
1177 boost::intrusive::member_hook<
1178 OSDShardPGSlot,
1179 boost::intrusive::set_member_hook<>,
1180 &OSDShardPGSlot::pg_epoch_item>,
1181 boost::intrusive::compare<pg_slot_compare_by_epoch>> pg_slots_by_epoch;
1182 int waiting_for_min_pg_epoch = 0;
1183 ceph::condition_variable min_pg_epoch_cond;
1184
1185 /// priority queue
1186 std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
1187
1188 bool stop_waiting = false;
1189
1190 ContextQueue context_queue;
1191
1192 void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
1193 unsigned priority = item.get_priority();
1194 unsigned cost = item.get_cost();
1195 if (priority >= cutoff)
1196 pqueue->enqueue_strict_front(
1197 item.get_owner(),
1198 priority, std::move(item));
1199 else
1200 pqueue->enqueue_front(
1201 item.get_owner(),
1202 priority, cost, std::move(item));
1203 }
1204
1205 void _attach_pg(OSDShardPGSlot *slot, PG *pg);
1206 void _detach_pg(OSDShardPGSlot *slot);
1207
1208 void update_pg_epoch(OSDShardPGSlot *slot, epoch_t epoch);
1209 epoch_t get_min_pg_epoch();
1210 void wait_min_pg_epoch(epoch_t need);
1211
1212 /// return newest epoch we are waiting for
1213 epoch_t get_max_waiting_epoch();
1214
1215 /// push osdmap into shard
1216 void consume_map(
1217 OSDMapRef& osdmap,
1218 unsigned *pushes_to_free);
1219
1220 void _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
1221
1222 void identify_splits_and_merges(
1223 const OSDMapRef& as_of_osdmap,
1224 set<pair<spg_t,epoch_t>> *split_children,
1225 set<pair<spg_t,epoch_t>> *merge_pgs);
1226 void _prime_splits(set<pair<spg_t,epoch_t>> *pgids);
1227 void prime_splits(const OSDMapRef& as_of_osdmap,
1228 set<pair<spg_t,epoch_t>> *pgids);
1229 void prime_merges(const OSDMapRef& as_of_osdmap,
1230 set<pair<spg_t,epoch_t>> *merge_pgs);
1231 void register_and_wake_split_child(PG *pg);
1232 void unprime_split_children(spg_t parent, unsigned old_pg_num);
1233
1234 OSDShard(
1235 int id,
1236 CephContext *cct,
1237 OSD *osd,
1238 uint64_t max_tok_per_prio, uint64_t min_cost,
1239 io_queue opqueue)
1240 : shard_id(id),
1241 cct(cct),
1242 osd(osd),
1243 shard_name(string("OSDShard.") + stringify(id)),
1244 sdata_wait_lock_name(shard_name + "::sdata_wait_lock"),
1245 sdata_wait_lock{make_mutex(sdata_wait_lock_name)},
1246 osdmap_lock_name(shard_name + "::osdmap_lock"),
1247 osdmap_lock{make_mutex(osdmap_lock_name)},
1248 shard_lock_name(shard_name + "::shard_lock"),
1249 shard_lock{make_mutex(shard_lock_name)},
1250 context_queue(sdata_wait_lock, sdata_cond) {
1251 if (opqueue == io_queue::weightedpriority) {
1252 pqueue = std::make_unique<
1253 WeightedPriorityQueue<OpQueueItem,uint64_t>>(
1254 max_tok_per_prio, min_cost);
1255 } else if (opqueue == io_queue::prioritized) {
1256 pqueue = std::make_unique<
1257 PrioritizedQueue<OpQueueItem,uint64_t>>(
1258 max_tok_per_prio, min_cost);
1259 } else if (opqueue == io_queue::mclock_opclass) {
1260 pqueue = std::make_unique<ceph::mClockOpClassQueue>(cct);
1261 } else if (opqueue == io_queue::mclock_client) {
1262 pqueue = std::make_unique<ceph::mClockClientQueue>(cct);
1263 }
1264 }
1265};
1266
7c673cae
FG
1267class OSD : public Dispatcher,
1268 public md_config_obs_t {
1269 /** OSD **/
11fdf7f2 1270 Mutex osd_lock; // global lock
7c673cae
FG
1271 SafeTimer tick_timer; // safe timer (osd_lock)
1272
1273 // Tick timer for those stuff that do not need osd_lock
1274 Mutex tick_timer_lock;
1275 SafeTimer tick_timer_without_osd_lock;
11fdf7f2
TL
1276 std::string gss_ktfile_client{};
1277
7c673cae
FG
1278public:
1279 // config observer bits
1280 const char** get_tracked_conf_keys() const override;
11fdf7f2 1281 void handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
1282 const std::set <std::string> &changed) override;
1283 void update_log_config();
1284 void check_config();
1285
1286protected:
1287
91327a77
AA
1288 const double OSD_TICK_INTERVAL = { 1.0 };
1289 double get_tick_interval() const;
7c673cae 1290
7c673cae
FG
1291 Messenger *cluster_messenger;
1292 Messenger *client_messenger;
1293 Messenger *objecter_messenger;
1294 MonClient *monc; // check the "monc helpers" list before accessing directly
1295 MgrClient mgrc;
1296 PerfCounters *logger;
1297 PerfCounters *recoverystate_perf;
1298 ObjectStore *store;
1299#ifdef HAVE_LIBFUSE
1300 FuseStore *fuse_store = nullptr;
1301#endif
1302 LogClient log_client;
1303 LogChannelRef clog;
1304
1305 int whoami;
1306 std::string dev_path, journal_path;
1307
11fdf7f2
TL
1308 int last_require_osd_release = 0;
1309
1310 int numa_node = -1;
1311 size_t numa_cpu_set_size = 0;
1312 cpu_set_t numa_cpu_set;
1313
31f18b77 1314 bool store_is_rotational = true;
d2e6a577 1315 bool journal_is_rotational = true;
31f18b77 1316
7c673cae
FG
1317 ZTracer::Endpoint trace_endpoint;
1318 void create_logger();
1319 void create_recoverystate_perf();
1320 void tick();
1321 void tick_without_osd_lock();
1322 void _dispatch(Message *m);
1323 void dispatch_op(OpRequestRef op);
1324
11fdf7f2 1325 void check_osdmap_features();
7c673cae
FG
1326
1327 // asok
1328 friend class OSDSocketHook;
1329 class OSDSocketHook *asok_hook;
11fdf7f2
TL
1330 bool asok_command(std::string_view admin_command, const cmdmap_t& cmdmap,
1331 std::string_view format, std::ostream& ss);
7c673cae
FG
1332
1333public:
1334 ClassHandler *class_handler = nullptr;
1335 int get_nodeid() { return whoami; }
1336
1337 static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
1338 char foo[20];
1339 snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
1340 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1341 }
1342 static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
1343 char foo[22];
1344 snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch);
1345 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1346 }
1347
1348 static ghobject_t make_snapmapper_oid() {
1349 return ghobject_t(hobject_t(
1350 sobject_t(
1351 object_t("snapmapper"),
1352 0)));
1353 }
1354
1355 static ghobject_t make_pg_log_oid(spg_t pg) {
1356 stringstream ss;
1357 ss << "pglog_" << pg;
1358 string s;
1359 getline(ss, s);
1360 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1361 }
1362
1363 static ghobject_t make_pg_biginfo_oid(spg_t pg) {
1364 stringstream ss;
1365 ss << "pginfo_" << pg;
1366 string s;
1367 getline(ss, s);
1368 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1369 }
1370 static ghobject_t make_infos_oid() {
1371 hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
1372 return ghobject_t(oid);
1373 }
11fdf7f2
TL
1374
1375 static ghobject_t make_final_pool_info_oid(int64_t pool) {
1376 return ghobject_t(
1377 hobject_t(
1378 sobject_t(
1379 object_t(string("final_pool_") + stringify(pool)),
1380 CEPH_NOSNAP)));
1381 }
1382
1383 static ghobject_t make_pg_num_history_oid() {
1384 return ghobject_t(hobject_t(sobject_t("pg_num_history", CEPH_NOSNAP)));
1385 }
1386
7c673cae
FG
1387 static void recursive_remove_collection(CephContext* cct,
1388 ObjectStore *store,
1389 spg_t pgid,
1390 coll_t tmp);
1391
1392 /**
1393 * get_osd_initial_compat_set()
1394 *
1395 * Get the initial feature set for this OSD. Features
1396 * here are automatically upgraded.
1397 *
1398 * Return value: Initial osd CompatSet
1399 */
1400 static CompatSet get_osd_initial_compat_set();
1401
1402 /**
1403 * get_osd_compat_set()
1404 *
1405 * Get all features supported by this OSD
1406 *
1407 * Return value: CompatSet of all supported features
1408 */
1409 static CompatSet get_osd_compat_set();
1410
1411
1412private:
1413 class C_Tick;
1414 class C_Tick_WithoutOSDLock;
1415
11fdf7f2
TL
1416 // -- config settings --
1417 float m_osd_pg_epoch_max_lag_factor;
1418
7c673cae
FG
1419 // -- superblock --
1420 OSDSuperblock superblock;
1421
1422 void write_superblock();
1423 void write_superblock(ObjectStore::Transaction& t);
1424 int read_superblock();
1425
1426 void clear_temp_objects();
1427
1428 CompatSet osd_compat;
1429
1430 // -- state --
1431public:
1432 typedef enum {
1433 STATE_INITIALIZING = 1,
1434 STATE_PREBOOT,
1435 STATE_BOOTING,
1436 STATE_ACTIVE,
1437 STATE_STOPPING,
1438 STATE_WAITING_FOR_HEALTHY
1439 } osd_state_t;
1440
1441 static const char *get_state_name(int s) {
1442 switch (s) {
1443 case STATE_INITIALIZING: return "initializing";
1444 case STATE_PREBOOT: return "preboot";
1445 case STATE_BOOTING: return "booting";
1446 case STATE_ACTIVE: return "active";
1447 case STATE_STOPPING: return "stopping";
1448 case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy";
1449 default: return "???";
1450 }
1451 }
1452
1453private:
11fdf7f2 1454 std::atomic<int> state{STATE_INITIALIZING};
7c673cae
FG
1455
1456public:
1457 int get_state() const {
1458 return state;
1459 }
1460 void set_state(int s) {
1461 state = s;
1462 }
1463 bool is_initializing() const {
1464 return state == STATE_INITIALIZING;
1465 }
1466 bool is_preboot() const {
1467 return state == STATE_PREBOOT;
1468 }
1469 bool is_booting() const {
1470 return state == STATE_BOOTING;
1471 }
1472 bool is_active() const {
1473 return state == STATE_ACTIVE;
1474 }
1475 bool is_stopping() const {
1476 return state == STATE_STOPPING;
1477 }
1478 bool is_waiting_for_healthy() const {
1479 return state == STATE_WAITING_FOR_HEALTHY;
1480 }
1481
1482private:
1483
7c673cae 1484 ShardedThreadPool osd_op_tp;
7c673cae
FG
1485 ThreadPool command_tp;
1486
7c673cae
FG
1487 void get_latest_osdmap();
1488
1489 // -- sessions --
1490private:
11fdf7f2 1491 void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap);
7c673cae
FG
1492 void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
1493
1494 Mutex session_waiting_lock;
11fdf7f2 1495 set<SessionRef> session_waiting_for_map;
7c673cae
FG
1496
1497 /// Caller assumes refs for included Sessions
11fdf7f2
TL
1498 void get_sessions_waiting_for_map(set<SessionRef> *out) {
1499 std::lock_guard l(session_waiting_lock);
7c673cae
FG
1500 out->swap(session_waiting_for_map);
1501 }
11fdf7f2
TL
1502 void register_session_waiting_on_map(SessionRef session) {
1503 std::lock_guard l(session_waiting_lock);
1504 session_waiting_for_map.insert(session);
7c673cae 1505 }
11fdf7f2
TL
1506 void clear_session_waiting_on_map(SessionRef session) {
1507 std::lock_guard l(session_waiting_lock);
1508 session_waiting_for_map.erase(session);
7c673cae
FG
1509 }
1510 void dispatch_sessions_waiting_on_map() {
11fdf7f2 1511 set<SessionRef> sessions_to_check;
7c673cae 1512 get_sessions_waiting_for_map(&sessions_to_check);
11fdf7f2 1513 for (auto i = sessions_to_check.begin();
7c673cae
FG
1514 i != sessions_to_check.end();
1515 sessions_to_check.erase(i++)) {
11fdf7f2
TL
1516 std::lock_guard l{(*i)->session_dispatch_lock};
1517 SessionRef session = *i;
1518 dispatch_session_waiting(session, osdmap);
7c673cae
FG
1519 }
1520 }
11fdf7f2
TL
1521 void session_handle_reset(SessionRef session) {
1522 std::lock_guard l(session->session_dispatch_lock);
7c673cae
FG
1523 clear_session_waiting_on_map(session);
1524
1525 session->clear_backoffs();
1526
1527 /* Messages have connection refs, we need to clear the
1528 * connection->session->message->connection
1529 * cycles which result.
1530 * Bug #12338
1531 */
1532 session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
1533 }
1534
1535private:
1536 /**
1537 * @defgroup monc helpers
1538 * @{
1539 * Right now we only have the one
1540 */
1541
1542 /**
1543 * Ask the Monitors for a sequence of OSDMaps.
1544 *
1545 * @param epoch The epoch to start with when replying
1546 * @param force_request True if this request forces a new subscription to
1547 * the monitors; false if an outstanding request that encompasses it is
1548 * sufficient.
1549 */
1550 void osdmap_subscribe(version_t epoch, bool force_request);
1551 /** @} monc helpers */
1552
181888fb
FG
1553 Mutex osdmap_subscribe_lock;
1554 epoch_t latest_subscribed_epoch{0};
1555
7c673cae
FG
1556 // -- heartbeat --
1557 /// information about a heartbeat peer
1558 struct HeartbeatInfo {
1559 int peer; ///< peer
1560 ConnectionRef con_front; ///< peer connection (front)
1561 ConnectionRef con_back; ///< peer connection (back)
1562 utime_t first_tx; ///< time we sent our first ping request
1563 utime_t last_tx; ///< last time we sent a ping request
1564 utime_t last_rx_front; ///< last time we got a ping reply on the front side
1565 utime_t last_rx_back; ///< last time we got a ping reply on the back side
1566 epoch_t epoch; ///< most recent epoch we wanted this peer
11fdf7f2
TL
1567 /// number of connections we send and receive heartbeat pings/replies
1568 static constexpr int HEARTBEAT_MAX_CONN = 2;
1569 /// history of inflight pings, arranging by timestamp we sent
1570 /// send time -> deadline -> remaining replies
1571 map<utime_t, pair<utime_t, int>> ping_history;
1572
eafe8130
TL
1573 utime_t hb_interval_start;
1574 uint32_t hb_average_count = 0;
1575 uint32_t hb_index = 0;
1576
1577 uint32_t hb_total_back = 0;
1578 uint32_t hb_min_back = UINT_MAX;
1579 uint32_t hb_max_back = 0;
1580 vector<uint32_t> hb_back_pingtime;
1581 vector<uint32_t> hb_back_min;
1582 vector<uint32_t> hb_back_max;
1583
1584 uint32_t hb_total_front = 0;
1585 uint32_t hb_min_front = UINT_MAX;
1586 uint32_t hb_max_front = 0;
1587 vector<uint32_t> hb_front_pingtime;
1588 vector<uint32_t> hb_front_min;
1589 vector<uint32_t> hb_front_max;
1590
494da23a
TL
1591 bool is_stale(utime_t stale) {
1592 if (ping_history.empty()) {
1593 return false;
1594 }
1595 utime_t oldest_deadline = ping_history.begin()->second.first;
1596 return oldest_deadline <= stale;
1597 }
1598
11fdf7f2
TL
1599 bool is_unhealthy(utime_t now) {
1600 if (ping_history.empty()) {
1601 /// we haven't sent a ping yet or we have got all replies,
1602 /// in either way we are safe and healthy for now
1603 return false;
1604 }
7c673cae 1605
11fdf7f2
TL
1606 utime_t oldest_deadline = ping_history.begin()->second.first;
1607 return now > oldest_deadline;
7c673cae
FG
1608 }
1609
11fdf7f2
TL
1610 bool is_healthy(utime_t now) {
1611 if (last_rx_front == utime_t() || last_rx_back == utime_t()) {
1612 // only declare to be healthy until we have received the first
1613 // replies from both front/back connections
1614 return false;
1615 }
1616 return !is_unhealthy(now);
1617 }
7c673cae
FG
1618 };
1619 /// state attached to outgoing heartbeat connections
1620 struct HeartbeatSession : public RefCountedObject {
1621 int peer;
1622 explicit HeartbeatSession(int p) : peer(p) {}
1623 };
1624 Mutex heartbeat_lock;
1625 map<int, int> debug_heartbeat_drops_remaining;
1626 Cond heartbeat_cond;
1627 bool heartbeat_stop;
11fdf7f2 1628 std::atomic<bool> heartbeat_need_update;
7c673cae
FG
1629 map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
1630 utime_t last_mon_heartbeat;
1631 Messenger *hb_front_client_messenger;
1632 Messenger *hb_back_client_messenger;
1633 Messenger *hb_front_server_messenger;
1634 Messenger *hb_back_server_messenger;
1635 utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
1636 double daily_loadavg;
eafe8130
TL
1637
1638 // Track ping repsonse times using vector as a circular buffer
1639 // MUST BE A POWER OF 2
1640 const uint32_t hb_vector_size = 16;
1641
7c673cae
FG
1642 void _add_heartbeat_peer(int p);
1643 void _remove_heartbeat_peer(int p);
1644 bool heartbeat_reset(Connection *con);
1645 void maybe_update_heartbeat_peers();
494da23a 1646 void reset_heartbeat_peers(bool all);
7c673cae
FG
1647 bool heartbeat_peers_need_update() {
1648 return heartbeat_need_update.load();
1649 }
1650 void heartbeat_set_peers_need_update() {
1651 heartbeat_need_update.store(true);
1652 }
1653 void heartbeat_clear_peers_need_update() {
1654 heartbeat_need_update.store(false);
1655 }
1656 void heartbeat();
1657 void heartbeat_check();
1658 void heartbeat_entry();
1659 void need_heartbeat_peer_update();
1660
1661 void heartbeat_kick() {
11fdf7f2 1662 std::lock_guard l(heartbeat_lock);
7c673cae
FG
1663 heartbeat_cond.Signal();
1664 }
1665
1666 struct T_Heartbeat : public Thread {
1667 OSD *osd;
1668 explicit T_Heartbeat(OSD *o) : osd(o) {}
1669 void *entry() override {
1670 osd->heartbeat_entry();
1671 return 0;
1672 }
1673 } heartbeat_thread;
1674
1675public:
1676 bool heartbeat_dispatch(Message *m);
1677
1678 struct HeartbeatDispatcher : public Dispatcher {
1679 OSD *osd;
1680 explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
1681
1682 bool ms_can_fast_dispatch_any() const override { return true; }
1683 bool ms_can_fast_dispatch(const Message *m) const override {
1684 switch (m->get_type()) {
11fdf7f2
TL
1685 case CEPH_MSG_PING:
1686 case MSG_OSD_PING:
1687 return true;
1688 default:
1689 return false;
1690 }
7c673cae
FG
1691 }
1692 void ms_fast_dispatch(Message *m) override {
1693 osd->heartbeat_dispatch(m);
1694 }
1695 bool ms_dispatch(Message *m) override {
1696 return osd->heartbeat_dispatch(m);
1697 }
1698 bool ms_handle_reset(Connection *con) override {
1699 return osd->heartbeat_reset(con);
1700 }
1701 void ms_handle_remote_reset(Connection *con) override {}
1702 bool ms_handle_refused(Connection *con) override {
1703 return osd->ms_handle_refused(con);
1704 }
11fdf7f2 1705 int ms_handle_authentication(Connection *con) override {
7c673cae
FG
1706 return true;
1707 }
11fdf7f2
TL
1708 bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer) override {
1709 // some pre-nautilus OSDs get confused if you include an
1710 // authorizer but they are not expecting it. do not try to authorize
1711 // heartbeat connections until all OSDs are nautilus.
1712 if (osd->get_osdmap()->require_osd_release >= CEPH_RELEASE_NAUTILUS) {
1713 return osd->ms_get_authorizer(dest_type, authorizer);
1714 }
1715 return false;
1716 }
1717 KeyStore *ms_get_auth1_authorizer_keystore() override {
1718 return osd->ms_get_auth1_authorizer_keystore();
1719 }
7c673cae
FG
1720 } heartbeat_dispatcher;
1721
1722private:
1723 // -- waiters --
1724 list<OpRequestRef> finished;
1725
1726 void take_waiters(list<OpRequestRef>& ls) {
11fdf7f2 1727 ceph_assert(osd_lock.is_locked());
7c673cae
FG
1728 finished.splice(finished.end(), ls);
1729 }
1730 void do_waiters();
1731
1732 // -- op tracking --
1733 OpTracker op_tracker;
7c673cae
FG
1734 void test_ops(std::string command, std::string args, ostream& ss);
1735 friend class TestOpsSocketHook;
1736 TestOpsSocketHook *test_ops_hook;
11fdf7f2 1737 friend struct C_FinishSplits;
7c673cae
FG
1738 friend struct C_OpenPGs;
1739
1740 // -- op queue --
11fdf7f2 1741 friend std::ostream& operator<<(std::ostream& out, const io_queue& q);
224ce89b 1742
7c673cae 1743 const io_queue op_queue;
11fdf7f2 1744public:
7c673cae 1745 const unsigned int op_prio_cutoff;
11fdf7f2 1746protected:
7c673cae
FG
1747
1748 /*
1749 * The ordered op delivery chain is:
1750 *
1751 * fast dispatch -> pqueue back
1752 * pqueue front <-> to_process back
1753 * to_process front -> RunVis(item)
1754 * <- queue_front()
1755 *
1756 * The pqueue is per-shard, and to_process is per pg_slot. Items can be
1757 * pushed back up into to_process and/or pqueue while order is preserved.
1758 *
1759 * Multiple worker threads can operate on each shard.
1760 *
11fdf7f2 1761 * Under normal circumstances, num_running == to_process.size(). There are
7c673cae
FG
1762 * two times when that is not true: (1) when waiting_for_pg == true and
1763 * to_process is accumulating requests that are waiting for the pg to be
1764 * instantiated; in that case they will all get requeued together by
1765 * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
1766 * and already requeued the items.
1767 */
11fdf7f2
TL
1768 friend class PGOpItem;
1769 friend class PGPeeringItem;
1770 friend class PGRecovery;
1771 friend class PGDelete;
224ce89b 1772
7c673cae 1773 class ShardedOpWQ
11fdf7f2 1774 : public ShardedThreadPool::ShardedWQ<OpQueueItem>
7c673cae 1775 {
7c673cae 1776 OSD *osd;
7c673cae
FG
1777
1778 public:
11fdf7f2 1779 ShardedOpWQ(OSD *o,
7c673cae
FG
1780 time_t ti,
1781 time_t si,
1782 ShardedThreadPool* tp)
11fdf7f2
TL
1783 : ShardedThreadPool::ShardedWQ<OpQueueItem>(ti, si, tp),
1784 osd(o) {
7c673cae
FG
1785 }
1786
11fdf7f2
TL
1787 void _add_slot_waiter(
1788 spg_t token,
1789 OSDShardPGSlot *slot,
1790 OpQueueItem&& qi);
7c673cae
FG
1791
1792 /// try to do some work
1793 void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
1794
1795 /// enqueue a new item
11fdf7f2 1796 void _enqueue(OpQueueItem&& item) override;
7c673cae
FG
1797
1798 /// requeue an old item (at the front of the line)
11fdf7f2 1799 void _enqueue_front(OpQueueItem&& item) override;
7c673cae
FG
1800
1801 void return_waiting_threads() override {
11fdf7f2
TL
1802 for(uint32_t i = 0; i < osd->num_shards; i++) {
1803 OSDShard* sdata = osd->shards[i];
1804 assert (NULL != sdata);
1805 std::scoped_lock l{sdata->sdata_wait_lock};
1806 sdata->stop_waiting = true;
1807 sdata->sdata_cond.notify_all();
7c673cae
FG
1808 }
1809 }
1810
11fdf7f2
TL
1811 void stop_return_waiting_threads() override {
1812 for(uint32_t i = 0; i < osd->num_shards; i++) {
1813 OSDShard* sdata = osd->shards[i];
7c673cae 1814 assert (NULL != sdata);
11fdf7f2
TL
1815 std::scoped_lock l{sdata->sdata_wait_lock};
1816 sdata->stop_waiting = false;
1817 }
1818 }
1819
1820 void dump(Formatter *f) {
1821 for(uint32_t i = 0; i < osd->num_shards; i++) {
1822 auto &&sdata = osd->shards[i];
1823
1824 char queue_name[32] = {0};
1825 snprintf(queue_name, sizeof(queue_name), "%s%" PRIu32, "OSD:ShardedOpWQ:", i);
1826 ceph_assert(NULL != sdata);
1827
1828 std::scoped_lock l{sdata->shard_lock};
1829 f->open_object_section(queue_name);
7c673cae
FG
1830 sdata->pqueue->dump(f);
1831 f->close_section();
7c673cae
FG
1832 }
1833 }
1834
11fdf7f2
TL
1835 bool is_shard_empty(uint32_t thread_index) override {
1836 uint32_t shard_index = thread_index % osd->num_shards;
1837 auto &&sdata = osd->shards[shard_index];
1838 ceph_assert(sdata);
1839 std::lock_guard l(sdata->shard_lock);
1840 if (thread_index < osd->num_shards) {
1841 return sdata->pqueue->empty() && sdata->context_queue.empty();
1842 } else {
1843 return sdata->pqueue->empty();
7c673cae 1844 }
11fdf7f2 1845 }
7c673cae 1846
11fdf7f2
TL
1847 void handle_oncommits(list<Context*>& oncommits) {
1848 for (auto p : oncommits) {
1849 p->complete(0);
1850 }
7c673cae
FG
1851 }
1852 } op_shardedwq;
1853
1854
11fdf7f2 1855 void enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch);
7c673cae
FG
1856 void dequeue_op(
1857 PGRef pg, OpRequestRef op,
1858 ThreadPool::TPHandle &handle);
1859
11fdf7f2
TL
1860 void enqueue_peering_evt(
1861 spg_t pgid,
1862 PGPeeringEventRef ref);
1863 void enqueue_peering_evt_front(
1864 spg_t pgid,
1865 PGPeeringEventRef ref);
1866 void dequeue_peering_evt(
1867 OSDShard *sdata,
1868 PG *pg,
1869 PGPeeringEventRef ref,
1870 ThreadPool::TPHandle& handle);
1871
1872 void dequeue_delete(
1873 OSDShard *sdata,
1874 PG *pg,
1875 epoch_t epoch,
1876 ThreadPool::TPHandle& handle);
7c673cae
FG
1877
1878 friend class PG;
11fdf7f2 1879 friend class OSDShard;
7c673cae
FG
1880 friend class PrimaryLogPG;
1881
1882
1883 protected:
1884
1885 // -- osd map --
1886 OSDMapRef osdmap;
1887 OSDMapRef get_osdmap() {
1888 return osdmap;
1889 }
224ce89b 1890 epoch_t get_osdmap_epoch() const {
7c673cae
FG
1891 return osdmap ? osdmap->get_epoch() : 0;
1892 }
1893
11fdf7f2
TL
1894 pool_pg_num_history_t pg_num_history;
1895
7c673cae
FG
1896 utime_t had_map_since;
1897 RWLock map_lock;
1898 list<OpRequestRef> waiting_for_osdmap;
1899 deque<utime_t> osd_markdown_log;
1900
1901 friend struct send_map_on_destruct;
1902
1903 void wait_for_new_map(OpRequestRef op);
1904 void handle_osd_map(class MOSDMap *m);
1905 void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
1906 void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
1907 void note_down_osd(int osd);
1908 void note_up_osd(int osd);
1909 friend class C_OnMapCommit;
1910
1911 bool advance_pg(
11fdf7f2
TL
1912 epoch_t advance_to,
1913 PG *pg,
7c673cae 1914 ThreadPool::TPHandle &handle,
11fdf7f2 1915 PG::RecoveryCtx *rctx);
7c673cae
FG
1916 void consume_map();
1917 void activate_map();
1918
1919 // osd map cache (past osd maps)
1920 OSDMapRef get_map(epoch_t e) {
1921 return service.get_map(e);
1922 }
1923 OSDMapRef add_map(OSDMap *o) {
1924 return service.add_map(o);
1925 }
1926 void add_map_bl(epoch_t e, bufferlist& bl) {
1927 return service.add_map_bl(e, bl);
1928 }
7c673cae
FG
1929 bool get_map_bl(epoch_t e, bufferlist& bl) {
1930 return service.get_map_bl(e, bl);
1931 }
1932 void add_map_inc_bl(epoch_t e, bufferlist& bl) {
1933 return service.add_map_inc_bl(e, bl);
1934 }
11fdf7f2
TL
1935
1936public:
1937 // -- shards --
1938 vector<OSDShard*> shards;
1939 uint32_t num_shards = 0;
1940
1941 void inc_num_pgs() {
1942 ++num_pgs;
1943 }
1944 void dec_num_pgs() {
1945 --num_pgs;
1946 }
1947 int get_num_pgs() const {
1948 return num_pgs;
7c673cae
FG
1949 }
1950
1951protected:
11fdf7f2
TL
1952 Mutex merge_lock = {"OSD::merge_lock"};
1953 /// merge epoch -> target pgid -> source pgid -> pg
1954 map<epoch_t,map<spg_t,map<spg_t,PGRef>>> merge_waiters;
1955
1956 bool add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef source,
1957 unsigned need);
1958
7c673cae 1959 // -- placement groups --
11fdf7f2 1960 std::atomic<size_t> num_pgs = {0};
7c673cae 1961
3efd9988 1962 std::mutex pending_creates_lock;
b32b8144
FG
1963 using create_from_osd_t = std::pair<pg_t, bool /* is primary*/>;
1964 std::set<create_from_osd_t> pending_creates_from_osd;
3efd9988
FG
1965 unsigned pending_creates_from_mon = 0;
1966
7c673cae
FG
1967 PGRecoveryStats pg_recovery_stats;
1968
11fdf7f2
TL
1969 PGRef _lookup_pg(spg_t pgid);
1970 PGRef _lookup_lock_pg(spg_t pgid);
1971 void register_pg(PGRef pg);
1972 bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
7c673cae 1973
11fdf7f2
TL
1974 void _get_pgs(vector<PGRef> *v, bool clear_too=false);
1975 void _get_pgids(vector<spg_t> *v);
31f18b77
FG
1976
1977public:
11fdf7f2 1978 PGRef lookup_lock_pg(spg_t pgid);
31f18b77 1979
11fdf7f2 1980 std::set<int64_t> get_mapped_pools();
35e4c445 1981
31f18b77 1982protected:
7c673cae 1983 PG* _make_pg(OSDMapRef createmap, spg_t pgid);
7c673cae 1984
11fdf7f2
TL
1985 bool maybe_wait_for_max_pg(const OSDMapRef& osdmap,
1986 spg_t pgid, bool is_mon_create);
3efd9988
FG
1987 void resume_creating_pg();
1988
7c673cae 1989 void load_pgs();
7c673cae
FG
1990
1991 /// build initial pg history and intervals on create
1992 void build_initial_pg_history(
1993 spg_t pgid,
1994 epoch_t created,
1995 utime_t created_stamp,
1996 pg_history_t *h,
1997 PastIntervals *pi);
1998
7c673cae
FG
1999 epoch_t last_pg_create_epoch;
2000
2001 void handle_pg_create(OpRequestRef op);
2002
2003 void split_pgs(
2004 PG *parent,
31f18b77 2005 const set<spg_t> &childpgids, set<PGRef> *out_pgs,
7c673cae
FG
2006 OSDMapRef curmap,
2007 OSDMapRef nextmap,
2008 PG::RecoveryCtx *rctx);
11fdf7f2 2009 void _finish_splits(set<PGRef>& pgs);
7c673cae
FG
2010
2011 // == monitor interaction ==
2012 Mutex mon_report_lock;
2013 utime_t last_mon_report;
11fdf7f2 2014 Finisher boot_finisher;
7c673cae
FG
2015
2016 // -- boot --
2017 void start_boot();
2018 void _got_mon_epochs(epoch_t oldest, epoch_t newest);
2019 void _preboot(epoch_t oldest, epoch_t newest);
2020 void _send_boot();
2021 void _collect_metadata(map<string,string> *pmeta);
2022
2023 void start_waiting_for_healthy();
2024 bool _is_healthy();
2025
2026 void send_full_update();
2027
2028 friend struct C_OSD_GetVersion;
2029
2030 // -- alive --
2031 epoch_t up_thru_wanted;
2032
2033 void queue_want_up_thru(epoch_t want);
2034 void send_alive();
2035
2036 // -- full map requests --
2037 epoch_t requested_full_first, requested_full_last;
2038
2039 void request_full_map(epoch_t first, epoch_t last);
2040 void rerequest_full_maps() {
2041 epoch_t first = requested_full_first;
2042 epoch_t last = requested_full_last;
2043 requested_full_first = 0;
2044 requested_full_last = 0;
2045 request_full_map(first, last);
2046 }
2047 void got_full_map(epoch_t e);
2048
2049 // -- failures --
2050 map<int,utime_t> failure_queue;
11fdf7f2 2051 map<int,pair<utime_t,entity_addrvec_t> > failure_pending;
7c673cae
FG
2052
2053 void requeue_failures();
2054 void send_failures();
11fdf7f2
TL
2055 void send_still_alive(epoch_t epoch, int osd, const entity_addrvec_t &addrs);
2056 void cancel_pending_failures();
7c673cae
FG
2057
2058 ceph::coarse_mono_clock::time_point last_sent_beacon;
2059 Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
2060 epoch_t min_last_epoch_clean = 0;
2061 // which pgs were scanned for min_lec
2062 std::vector<pg_t> min_last_epoch_clean_pgs;
2063 void send_beacon(const ceph::coarse_mono_clock::time_point& now);
2064
7c673cae
FG
2065 ceph_tid_t get_tid() {
2066 return service.get_tid();
2067 }
2068
2069 // -- generic pg peering --
2070 PG::RecoveryCtx create_context();
2071 void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
2072 ThreadPool::TPHandle *handle = NULL);
2073 void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
2074 ThreadPool::TPHandle *handle = NULL);
11fdf7f2 2075 void discard_context(PG::RecoveryCtx &ctx);
7c673cae
FG
2076 void do_notifies(map<int,
2077 vector<pair<pg_notify_t, PastIntervals> > >&
2078 notify_list,
2079 OSDMapRef map);
2080 void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
2081 OSDMapRef map);
2082 void do_infos(map<int,
2083 vector<pair<pg_notify_t, PastIntervals> > >& info_map,
2084 OSDMapRef map);
2085
2086 bool require_mon_peer(const Message *m);
2087 bool require_mon_or_mgr_peer(const Message *m);
2088 bool require_osd_peer(const Message *m);
2089 /***
2090 * Verifies that we were alive in the given epoch, and that
2091 * still are.
2092 */
2093 bool require_self_aliveness(const Message *m, epoch_t alive_since);
2094 /**
2095 * Verifies that the OSD who sent the given op has the same
2096 * address as in the given map.
2097 * @pre op was sent by an OSD using the cluster messenger
2098 */
2099 bool require_same_peer_instance(const Message *m, OSDMapRef& map,
2100 bool is_fast_dispatch);
2101
2102 bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
2103 bool is_fast_dispatch);
2104
11fdf7f2
TL
2105 void handle_fast_pg_create(MOSDPGCreate2 *m);
2106 void handle_fast_pg_query(MOSDPGQuery *m);
2107 void handle_pg_query_nopg(const MQuery& q);
2108 void handle_fast_pg_notify(MOSDPGNotify *m);
2109 void handle_pg_notify_nopg(const MNotifyRec& q);
2110 void handle_fast_pg_info(MOSDPGInfo *m);
2111 void handle_fast_pg_remove(MOSDPGRemove *m);
7c673cae 2112
11fdf7f2
TL
2113public:
2114 // used by OSDShard
2115 PGRef handle_pg_create_info(const OSDMapRef& osdmap, const PGCreateInfo *info);
2116protected:
c07f9fc5 2117
11fdf7f2 2118 void handle_fast_force_recovery(MOSDForceRecovery *m);
7c673cae
FG
2119
2120 // -- commands --
2121 struct Command {
2122 vector<string> cmd;
2123 ceph_tid_t tid;
2124 bufferlist indata;
2125 ConnectionRef con;
2126
2127 Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co)
2128 : cmd(c), tid(t), indata(bl), con(co) {}
2129 };
2130 list<Command*> command_queue;
2131 struct CommandWQ : public ThreadPool::WorkQueue<Command> {
2132 OSD *osd;
2133 CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
2134 : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {}
2135
2136 bool _empty() override {
2137 return osd->command_queue.empty();
2138 }
2139 bool _enqueue(Command *c) override {
2140 osd->command_queue.push_back(c);
2141 return true;
2142 }
2143 void _dequeue(Command *pg) override {
2144 ceph_abort();
2145 }
2146 Command *_dequeue() override {
2147 if (osd->command_queue.empty())
2148 return NULL;
2149 Command *c = osd->command_queue.front();
2150 osd->command_queue.pop_front();
2151 return c;
2152 }
2153 void _process(Command *c, ThreadPool::TPHandle &) override {
11fdf7f2 2154 osd->osd_lock.lock();
7c673cae 2155 if (osd->is_stopping()) {
11fdf7f2 2156 osd->osd_lock.unlock();
7c673cae
FG
2157 delete c;
2158 return;
2159 }
2160 osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
11fdf7f2 2161 osd->osd_lock.unlock();
7c673cae
FG
2162 delete c;
2163 }
2164 void _clear() override {
2165 while (!osd->command_queue.empty()) {
2166 Command *c = osd->command_queue.front();
2167 osd->command_queue.pop_front();
2168 delete c;
2169 }
2170 }
2171 } command_wq;
2172
2173 void handle_command(class MMonCommand *m);
2174 void handle_command(class MCommand *m);
2175 void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
11fdf7f2
TL
2176 int _do_command(
2177 Connection *con, cmdmap_t& cmdmap, ceph_tid_t tid, bufferlist& data,
2178 bufferlist& odata, stringstream& ss, stringstream& ds);
2179
7c673cae
FG
2180
2181 // -- pg recovery --
2182 void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
2183 ThreadPool::TPHandle &handle);
2184
2185
2186 // -- scrubbing --
2187 void sched_scrub();
494da23a 2188 void resched_all_scrubs();
7c673cae
FG
2189 bool scrub_random_backoff();
2190 bool scrub_load_below_threshold();
2191 bool scrub_time_permit(utime_t now);
2192
b32b8144
FG
2193 // -- status reporting --
2194 MPGStats *collect_pg_stats();
11fdf7f2
TL
2195 std::vector<DaemonHealthMetric> get_health_metrics();
2196
b32b8144 2197
224ce89b 2198private:
7c673cae
FG
2199 bool ms_can_fast_dispatch_any() const override { return true; }
2200 bool ms_can_fast_dispatch(const Message *m) const override {
2201 switch (m->get_type()) {
11fdf7f2 2202 case CEPH_MSG_PING:
7c673cae
FG
2203 case CEPH_MSG_OSD_OP:
2204 case CEPH_MSG_OSD_BACKOFF:
11fdf7f2
TL
2205 case MSG_OSD_SCRUB2:
2206 case MSG_OSD_FORCE_RECOVERY:
2207 case MSG_MON_COMMAND:
2208 case MSG_OSD_PG_CREATE2:
2209 case MSG_OSD_PG_QUERY:
2210 case MSG_OSD_PG_INFO:
2211 case MSG_OSD_PG_NOTIFY:
2212 case MSG_OSD_PG_LOG:
2213 case MSG_OSD_PG_TRIM:
2214 case MSG_OSD_PG_REMOVE:
2215 case MSG_OSD_BACKFILL_RESERVE:
2216 case MSG_OSD_RECOVERY_RESERVE:
7c673cae 2217 case MSG_OSD_REPOP:
7c673cae
FG
2218 case MSG_OSD_REPOPREPLY:
2219 case MSG_OSD_PG_PUSH:
2220 case MSG_OSD_PG_PULL:
2221 case MSG_OSD_PG_PUSH_REPLY:
2222 case MSG_OSD_PG_SCAN:
2223 case MSG_OSD_PG_BACKFILL:
2224 case MSG_OSD_PG_BACKFILL_REMOVE:
2225 case MSG_OSD_EC_WRITE:
2226 case MSG_OSD_EC_WRITE_REPLY:
2227 case MSG_OSD_EC_READ:
2228 case MSG_OSD_EC_READ_REPLY:
2229 case MSG_OSD_SCRUB_RESERVE:
2230 case MSG_OSD_REP_SCRUB:
2231 case MSG_OSD_REP_SCRUBMAP:
2232 case MSG_OSD_PG_UPDATE_LOG_MISSING:
2233 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
c07f9fc5
FG
2234 case MSG_OSD_PG_RECOVERY_DELETE:
2235 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
7c673cae
FG
2236 return true;
2237 default:
2238 return false;
2239 }
2240 }
2241 void ms_fast_dispatch(Message *m) override;
7c673cae 2242 bool ms_dispatch(Message *m) override;
11fdf7f2 2243 bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer) override;
7c673cae
FG
2244 void ms_handle_connect(Connection *con) override;
2245 void ms_handle_fast_connect(Connection *con) override;
2246 void ms_handle_fast_accept(Connection *con) override;
11fdf7f2
TL
2247 int ms_handle_authentication(Connection *con) override;
2248 KeyStore *ms_get_auth1_authorizer_keystore() override;
7c673cae
FG
2249 bool ms_handle_reset(Connection *con) override;
2250 void ms_handle_remote_reset(Connection *con) override {}
2251 bool ms_handle_refused(Connection *con) override;
2252
2253 io_queue get_io_queue() const {
2254 if (cct->_conf->osd_op_queue == "debug_random") {
224ce89b
WB
2255 static io_queue index_lookup[] = { io_queue::prioritized,
2256 io_queue::weightedpriority,
2257 io_queue::mclock_opclass,
2258 io_queue::mclock_client };
7c673cae 2259 srand(time(NULL));
224ce89b
WB
2260 unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
2261 return index_lookup[which];
d2e6a577
FG
2262 } else if (cct->_conf->osd_op_queue == "prioritized") {
2263 return io_queue::prioritized;
224ce89b
WB
2264 } else if (cct->_conf->osd_op_queue == "mclock_opclass") {
2265 return io_queue::mclock_opclass;
2266 } else if (cct->_conf->osd_op_queue == "mclock_client") {
2267 return io_queue::mclock_client;
7c673cae 2268 } else {
d2e6a577
FG
2269 // default / catch-all is 'wpq'
2270 return io_queue::weightedpriority;
7c673cae
FG
2271 }
2272 }
2273
2274 unsigned int get_io_prio_cut() const {
2275 if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
2276 srand(time(NULL));
2277 return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
d2e6a577 2278 } else if (cct->_conf->osd_op_queue_cut_off == "high") {
7c673cae 2279 return CEPH_MSG_PRIO_HIGH;
d2e6a577
FG
2280 } else {
2281 // default / catch-all is 'low'
2282 return CEPH_MSG_PRIO_LOW;
7c673cae
FG
2283 }
2284 }
2285
2286 public:
2287 /* internal and external can point to the same messenger, they will still
2288 * be cleaned up properly*/
2289 OSD(CephContext *cct_,
2290 ObjectStore *store_,
2291 int id,
2292 Messenger *internal,
2293 Messenger *external,
2294 Messenger *hb_front_client,
2295 Messenger *hb_back_client,
2296 Messenger *hb_front_server,
2297 Messenger *hb_back_server,
2298 Messenger *osdc_messenger,
2299 MonClient *mc, const std::string &dev, const std::string &jdev);
2300 ~OSD() override;
2301
2302 // static bits
11fdf7f2
TL
2303 static int mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami);
2304
7c673cae
FG
2305 /* remove any non-user xattrs from a map of them */
2306 void filter_xattrs(map<string, bufferptr>& attrs) {
2307 for (map<string, bufferptr>::iterator iter = attrs.begin();
2308 iter != attrs.end();
2309 ) {
2310 if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
2311 attrs.erase(iter++);
2312 else ++iter;
2313 }
2314 }
2315
2316private:
2317 int mon_cmd_maybe_osd_create(string &cmd);
2318 int update_crush_device_class();
2319 int update_crush_location();
2320
3efd9988
FG
2321 static int write_meta(CephContext *cct,
2322 ObjectStore *store,
7c673cae
FG
2323 uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami);
2324
7c673cae 2325 void handle_scrub(struct MOSDScrub *m);
11fdf7f2 2326 void handle_fast_scrub(struct MOSDScrub2 *m);
7c673cae
FG
2327 void handle_osd_ping(class MOSDPing *m);
2328
2329 int init_op_flags(OpRequestRef& op);
2330
31f18b77
FG
2331 int get_num_op_shards();
2332 int get_num_op_threads();
2333
c07f9fc5 2334 float get_osd_recovery_sleep();
11fdf7f2 2335 float get_osd_delete_sleep();
494da23a 2336 float get_osd_snap_trim_sleep();
11fdf7f2
TL
2337
2338 void probe_smart(const string& devid, ostream& ss);
c07f9fc5 2339
7c673cae 2340public:
11fdf7f2
TL
2341 static int peek_meta(ObjectStore *store,
2342 string *magic,
2343 uuid_d *cluster_fsid,
2344 uuid_d *osd_fsid,
2345 int *whoami,
2346 int *min_osd_release);
7c673cae
FG
2347
2348
2349 // startup/shutdown
2350 int pre_init();
2351 int init();
2352 void final_init();
2353
2354 int enable_disable_fuse(bool stop);
11fdf7f2 2355 int set_numa_affinity();
7c673cae
FG
2356
2357 void suicide(int exitcode);
2358 int shutdown();
2359
2360 void handle_signal(int signum);
2361
2362 /// check if we can throw out op from a disconnected client
2363 static bool op_is_discardable(const MOSDOp *m);
2364
2365public:
2366 OSDService service;
2367 friend class OSDService;
11fdf7f2
TL
2368
2369private:
2370 void set_perf_queries(
2371 const std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> &queries);
2372 void get_perf_reports(
2373 std::map<OSDPerfMetricQuery, OSDPerfMetricReport> *reports);
2374
2375 Mutex m_perf_queries_lock = {"OSD::m_perf_queries_lock"};
2376 std::list<OSDPerfMetricQuery> m_perf_queries;
2377 std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> m_perf_limits;
7c673cae
FG
2378};
2379
224ce89b 2380
11fdf7f2 2381std::ostream& operator<<(std::ostream& out, const io_queue& q);
224ce89b
WB
2382
2383
7c673cae
FG
2384//compatibility of the executable
2385extern const CompatSet::Feature ceph_osd_feature_compat[];
2386extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
2387extern const CompatSet::Feature ceph_osd_feature_incompat[];
2388
224ce89b 2389#endif // CEPH_OSD_H