]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/OSD.h
update sources to 12.2.7
[ceph.git] / ceph / src / osd / OSD.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_OSD_H
16#define CEPH_OSD_H
17
18#include "PG.h"
19
20#include "msg/Dispatcher.h"
21
22#include "common/Mutex.h"
23#include "common/RWLock.h"
24#include "common/Timer.h"
25#include "common/WorkQueue.h"
26#include "common/AsyncReserver.h"
27#include "common/ceph_context.h"
28#include "common/zipkin_trace.h"
29
30#include "mgr/MgrClient.h"
31
32#include "os/ObjectStore.h"
33#include "OSDCap.h"
34
35#include "auth/KeyRing.h"
36#include "osd/ClassHandler.h"
37
38#include "include/CompatSet.h"
39
40#include "OpRequest.h"
41#include "Session.h"
42
224ce89b
WB
43#include "osd/PGQueueable.h"
44
7c673cae
FG
45#include <atomic>
46#include <map>
47#include <memory>
48#include "include/memory.h"
49using namespace std;
50
51#include "include/unordered_map.h"
52
53#include "common/shared_cache.hpp"
54#include "common/simple_cache.hpp"
55#include "common/sharedptr_registry.hpp"
56#include "common/WeightedPriorityQueue.h"
57#include "common/PrioritizedQueue.h"
224ce89b
WB
58#include "osd/mClockOpClassQueue.h"
59#include "osd/mClockClientQueue.h"
7c673cae
FG
60#include "messages/MOSDOp.h"
61#include "include/Spinlock.h"
62#include "common/EventTrace.h"
63
64#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
65
66
67enum {
68 l_osd_first = 10000,
69 l_osd_op_wip,
70 l_osd_op,
71 l_osd_op_inb,
72 l_osd_op_outb,
73 l_osd_op_lat,
74 l_osd_op_process_lat,
75 l_osd_op_prepare_lat,
76 l_osd_op_r,
77 l_osd_op_r_outb,
78 l_osd_op_r_lat,
79 l_osd_op_r_lat_outb_hist,
80 l_osd_op_r_process_lat,
81 l_osd_op_r_prepare_lat,
82 l_osd_op_w,
83 l_osd_op_w_inb,
84 l_osd_op_w_lat,
85 l_osd_op_w_lat_inb_hist,
86 l_osd_op_w_process_lat,
87 l_osd_op_w_prepare_lat,
88 l_osd_op_rw,
89 l_osd_op_rw_inb,
90 l_osd_op_rw_outb,
91 l_osd_op_rw_lat,
92 l_osd_op_rw_lat_inb_hist,
93 l_osd_op_rw_lat_outb_hist,
94 l_osd_op_rw_process_lat,
95 l_osd_op_rw_prepare_lat,
96
224ce89b
WB
97 l_osd_op_before_queue_op_lat,
98 l_osd_op_before_dequeue_op_lat,
99
7c673cae
FG
100 l_osd_sop,
101 l_osd_sop_inb,
102 l_osd_sop_lat,
103 l_osd_sop_w,
104 l_osd_sop_w_inb,
105 l_osd_sop_w_lat,
106 l_osd_sop_pull,
107 l_osd_sop_pull_lat,
108 l_osd_sop_push,
109 l_osd_sop_push_inb,
110 l_osd_sop_push_lat,
111
112 l_osd_pull,
113 l_osd_push,
114 l_osd_push_outb,
115
116 l_osd_rop,
117
118 l_osd_loadavg,
119 l_osd_buf,
120 l_osd_history_alloc_bytes,
121 l_osd_history_alloc_num,
122 l_osd_cached_crc,
123 l_osd_cached_crc_adjusted,
124 l_osd_missed_crc,
125
126 l_osd_pg,
127 l_osd_pg_primary,
128 l_osd_pg_replica,
129 l_osd_pg_stray,
94b18763 130 l_osd_pg_removing,
7c673cae
FG
131 l_osd_hb_to,
132 l_osd_map,
133 l_osd_mape,
134 l_osd_mape_dup,
135
136 l_osd_waiting_for_map,
137
138 l_osd_map_cache_hit,
139 l_osd_map_cache_miss,
140 l_osd_map_cache_miss_low,
141 l_osd_map_cache_miss_low_avg,
31f18b77
FG
142 l_osd_map_bl_cache_hit,
143 l_osd_map_bl_cache_miss,
7c673cae
FG
144
145 l_osd_stat_bytes,
146 l_osd_stat_bytes_used,
147 l_osd_stat_bytes_avail,
148
149 l_osd_copyfrom,
150
151 l_osd_tier_promote,
152 l_osd_tier_flush,
153 l_osd_tier_flush_fail,
154 l_osd_tier_try_flush,
155 l_osd_tier_try_flush_fail,
156 l_osd_tier_evict,
157 l_osd_tier_whiteout,
158 l_osd_tier_dirty,
159 l_osd_tier_clean,
160 l_osd_tier_delay,
161 l_osd_tier_proxy_read,
162 l_osd_tier_proxy_write,
163
164 l_osd_agent_wake,
165 l_osd_agent_skip,
166 l_osd_agent_flush,
167 l_osd_agent_evict,
168
169 l_osd_object_ctx_cache_hit,
170 l_osd_object_ctx_cache_total,
171
172 l_osd_op_cache_hit,
173 l_osd_tier_flush_lat,
174 l_osd_tier_promote_lat,
175 l_osd_tier_r_lat,
176
177 l_osd_pg_info,
178 l_osd_pg_fastinfo,
179 l_osd_pg_biginfo,
180
181 l_osd_last,
182};
183
184// RecoveryState perf counters
185enum {
186 rs_first = 20000,
187 rs_initial_latency,
188 rs_started_latency,
189 rs_reset_latency,
190 rs_start_latency,
191 rs_primary_latency,
192 rs_peering_latency,
193 rs_backfilling_latency,
194 rs_waitremotebackfillreserved_latency,
195 rs_waitlocalbackfillreserved_latency,
196 rs_notbackfilling_latency,
197 rs_repnotrecovering_latency,
198 rs_repwaitrecoveryreserved_latency,
199 rs_repwaitbackfillreserved_latency,
200 rs_reprecovering_latency,
201 rs_activating_latency,
202 rs_waitlocalrecoveryreserved_latency,
203 rs_waitremoterecoveryreserved_latency,
204 rs_recovering_latency,
205 rs_recovered_latency,
206 rs_clean_latency,
207 rs_active_latency,
208 rs_replicaactive_latency,
209 rs_stray_latency,
210 rs_getinfo_latency,
211 rs_getlog_latency,
212 rs_waitactingchange_latency,
213 rs_incomplete_latency,
214 rs_down_latency,
215 rs_getmissing_latency,
216 rs_waitupthru_latency,
217 rs_notrecovering_latency,
218 rs_last,
219};
220
221class Messenger;
222class Message;
223class MonClient;
224class PerfCounters;
225class ObjectStore;
226class FuseStore;
227class OSDMap;
228class MLog;
229class Objecter;
230
231class Watch;
232class PrimaryLogPG;
233
234class AuthAuthorizeHandlerRegistry;
235
236class TestOpsSocketHook;
237struct C_CompleteSplits;
238struct C_OpenPGs;
239class LogChannel;
240class CephContext;
241typedef ceph::shared_ptr<ObjectStore::Sequencer> SequencerRef;
242class MOSDOp;
243
244class DeletingState {
245 Mutex lock;
246 Cond cond;
247 enum {
248 QUEUED,
249 CLEARING_DIR,
250 CLEARING_WAITING,
251 DELETING_DIR,
252 DELETED_DIR,
253 CANCELED,
254 } status;
255 bool stop_deleting;
256public:
257 const spg_t pgid;
258 const PGRef old_pg_state;
259 explicit DeletingState(const pair<spg_t, PGRef> &in) :
260 lock("DeletingState::lock"), status(QUEUED), stop_deleting(false),
261 pgid(in.first), old_pg_state(in.second) {
262 }
263
264 /// transition status to CLEARING_WAITING
265 bool pause_clearing() {
266 Mutex::Locker l(lock);
267 assert(status == CLEARING_DIR);
268 if (stop_deleting) {
269 status = CANCELED;
270 cond.Signal();
271 return false;
272 }
273 status = CLEARING_WAITING;
274 return true;
275 } ///< @return false if we should cancel deletion
276
277 /// start or resume the clearing - transition the status to CLEARING_DIR
278 bool start_or_resume_clearing() {
279 Mutex::Locker l(lock);
280 assert(
281 status == QUEUED ||
282 status == DELETED_DIR ||
283 status == CLEARING_WAITING);
284 if (stop_deleting) {
285 status = CANCELED;
286 cond.Signal();
287 return false;
288 }
289 status = CLEARING_DIR;
290 return true;
291 } ///< @return false if we should cancel the deletion
292
293 /// transition status to CLEARING_DIR
294 bool resume_clearing() {
295 Mutex::Locker l(lock);
296 assert(status == CLEARING_WAITING);
297 if (stop_deleting) {
298 status = CANCELED;
299 cond.Signal();
300 return false;
301 }
302 status = CLEARING_DIR;
303 return true;
304 } ///< @return false if we should cancel deletion
305
306 /// transition status to deleting
307 bool start_deleting() {
308 Mutex::Locker l(lock);
309 assert(status == CLEARING_DIR);
310 if (stop_deleting) {
311 status = CANCELED;
312 cond.Signal();
313 return false;
314 }
315 status = DELETING_DIR;
316 return true;
317 } ///< @return false if we should cancel deletion
318
319 /// signal collection removal queued
320 void finish_deleting() {
321 Mutex::Locker l(lock);
322 assert(status == DELETING_DIR);
323 status = DELETED_DIR;
324 cond.Signal();
325 }
326
327 /// try to halt the deletion
328 bool try_stop_deletion() {
329 Mutex::Locker l(lock);
330 stop_deleting = true;
331 /**
332 * If we are in DELETING_DIR or CLEARING_DIR, there are in progress
333 * operations we have to wait for before continuing on. States
334 * CLEARING_WAITING and QUEUED indicate that the remover will check
335 * stop_deleting before queueing any further operations. CANCELED
336 * indicates that the remover has already halted. DELETED_DIR
337 * indicates that the deletion has been fully queued.
338 */
339 while (status == DELETING_DIR || status == CLEARING_DIR)
340 cond.Wait(lock);
341 return status != DELETED_DIR;
342 } ///< @return true if we don't need to recreate the collection
343};
344typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
345
346class OSD;
347
7c673cae
FG
348class OSDService {
349public:
350 OSD *osd;
351 CephContext *cct;
352 SharedPtrRegistry<spg_t, ObjectStore::Sequencer> osr_registry;
353 ceph::shared_ptr<ObjectStore::Sequencer> meta_osr;
354 SharedPtrRegistry<spg_t, DeletingState> deleting_pgs;
355 const int whoami;
356 ObjectStore *&store;
357 LogClient &log_client;
358 LogChannelRef clog;
359 PGRecoveryStats &pg_recovery_stats;
360private:
361 Messenger *&cluster_messenger;
362 Messenger *&client_messenger;
363public:
364 PerfCounters *&logger;
365 PerfCounters *&recoverystate_perf;
366 MonClient *&monc;
367 ThreadPool::BatchWorkQueue<PG> &peering_wq;
368 GenContextWQ recovery_gen_wq;
369 ClassHandler *&class_handler;
370
371 void enqueue_back(spg_t pgid, PGQueueable qi);
372 void enqueue_front(spg_t pgid, PGQueueable qi);
373
374 void maybe_inject_dispatch_delay() {
375 if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) {
376 if (rand() % 10000 <
377 g_conf->osd_debug_inject_dispatch_delay_probability * 10000) {
378 utime_t t;
379 t.set_from_double(g_conf->osd_debug_inject_dispatch_delay_duration);
380 t.sleep();
381 }
382 }
383 }
384
385private:
386 // -- map epoch lower bound --
387 Mutex pg_epoch_lock;
388 multiset<epoch_t> pg_epochs;
389 map<spg_t,epoch_t> pg_epoch;
390
391public:
392 void pg_add_epoch(spg_t pgid, epoch_t epoch) {
393 Mutex::Locker l(pg_epoch_lock);
394 map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
395 assert(t == pg_epoch.end());
396 pg_epoch[pgid] = epoch;
397 pg_epochs.insert(epoch);
398 }
399 void pg_update_epoch(spg_t pgid, epoch_t epoch) {
400 Mutex::Locker l(pg_epoch_lock);
401 map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
402 assert(t != pg_epoch.end());
403 pg_epochs.erase(pg_epochs.find(t->second));
404 t->second = epoch;
405 pg_epochs.insert(epoch);
406 }
407 void pg_remove_epoch(spg_t pgid) {
408 Mutex::Locker l(pg_epoch_lock);
409 map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
410 if (t != pg_epoch.end()) {
411 pg_epochs.erase(pg_epochs.find(t->second));
412 pg_epoch.erase(t);
413 }
414 }
415 epoch_t get_min_pg_epoch() {
416 Mutex::Locker l(pg_epoch_lock);
417 if (pg_epochs.empty())
418 return 0;
419 else
420 return *pg_epochs.begin();
421 }
422
423private:
424 // -- superblock --
425 Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
426 OSDSuperblock superblock;
427
428public:
429 OSDSuperblock get_superblock() {
430 Mutex::Locker l(publish_lock);
431 return superblock;
432 }
433 void publish_superblock(const OSDSuperblock &block) {
434 Mutex::Locker l(publish_lock);
435 superblock = block;
436 }
437
438 int get_nodeid() const { return whoami; }
439
440 std::atomic<epoch_t> max_oldest_map;
441private:
442 OSDMapRef osdmap;
443
444public:
445 OSDMapRef get_osdmap() {
446 Mutex::Locker l(publish_lock);
447 return osdmap;
448 }
449 epoch_t get_osdmap_epoch() {
450 Mutex::Locker l(publish_lock);
451 return osdmap ? osdmap->get_epoch() : 0;
452 }
453 void publish_map(OSDMapRef map) {
454 Mutex::Locker l(publish_lock);
455 osdmap = map;
456 }
457
458 /*
459 * osdmap - current published map
460 * next_osdmap - pre_published map that is about to be published.
461 *
462 * We use the next_osdmap to send messages and initiate connections,
463 * but only if the target is the same instance as the one in the map
464 * epoch the current user is working from (i.e., the result is
465 * equivalent to what is in next_osdmap).
466 *
467 * This allows the helpers to start ignoring osds that are about to
468 * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
469 * down, without worrying about reopening connections from threads
470 * working from old maps.
471 */
472private:
473 OSDMapRef next_osdmap;
474 Cond pre_publish_cond;
475
476public:
477 void pre_publish_map(OSDMapRef map) {
478 Mutex::Locker l(pre_publish_lock);
479 next_osdmap = std::move(map);
480 }
481
482 void activate_map();
483 /// map epochs reserved below
484 map<epoch_t, unsigned> map_reservations;
485
486 /// gets ref to next_osdmap and registers the epoch as reserved
487 OSDMapRef get_nextmap_reserved() {
488 Mutex::Locker l(pre_publish_lock);
489 if (!next_osdmap)
490 return OSDMapRef();
491 epoch_t e = next_osdmap->get_epoch();
492 map<epoch_t, unsigned>::iterator i =
493 map_reservations.insert(make_pair(e, 0)).first;
494 i->second++;
495 return next_osdmap;
496 }
497 /// releases reservation on map
498 void release_map(OSDMapRef osdmap) {
499 Mutex::Locker l(pre_publish_lock);
500 map<epoch_t, unsigned>::iterator i =
501 map_reservations.find(osdmap->get_epoch());
502 assert(i != map_reservations.end());
503 assert(i->second > 0);
504 if (--(i->second) == 0) {
505 map_reservations.erase(i);
506 }
507 pre_publish_cond.Signal();
508 }
509 /// blocks until there are no reserved maps prior to next_osdmap
510 void await_reserved_maps() {
511 Mutex::Locker l(pre_publish_lock);
512 assert(next_osdmap);
513 while (true) {
514 map<epoch_t, unsigned>::const_iterator i = map_reservations.cbegin();
515 if (i == map_reservations.cend() || i->first >= next_osdmap->get_epoch()) {
516 break;
517 } else {
518 pre_publish_cond.Wait(pre_publish_lock);
519 }
520 }
521 }
522
523private:
524 Mutex peer_map_epoch_lock;
525 map<int, epoch_t> peer_map_epoch;
526public:
527 epoch_t get_peer_epoch(int p);
528 epoch_t note_peer_epoch(int p, epoch_t e);
529 void forget_peer_epoch(int p, epoch_t e);
530
531 void send_map(class MOSDMap *m, Connection *con);
532 void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
533 MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
534 OSDSuperblock& superblock);
535 bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
536 const OSDMapRef& osdmap, const epoch_t *sent_epoch_p);
537 void share_map(entity_name_t name, Connection *con, epoch_t epoch,
538 OSDMapRef& osdmap, epoch_t *sent_epoch_p);
539 void share_map_peer(int peer, Connection *con,
540 OSDMapRef map = OSDMapRef());
541
542 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
543 pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
544 void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
545 void send_message_osd_cluster(Message *m, Connection *con) {
546 con->send_message(m);
547 }
548 void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
549 con->send_message(m);
550 }
551 void send_message_osd_client(Message *m, Connection *con) {
552 con->send_message(m);
553 }
554 void send_message_osd_client(Message *m, const ConnectionRef& con) {
555 con->send_message(m);
556 }
557 entity_name_t get_cluster_msgr_name() {
558 return cluster_messenger->get_myname();
559 }
560
561private:
562 // -- scrub scheduling --
563 Mutex sched_scrub_lock;
564 int scrubs_pending;
565 int scrubs_active;
566
567public:
568 struct ScrubJob {
569 CephContext* cct;
570 /// pg to be scrubbed
571 spg_t pgid;
572 /// a time scheduled for scrub. but the scrub could be delayed if system
573 /// load is too high or it fails to fall in the scrub hours
574 utime_t sched_time;
575 /// the hard upper bound of scrub time
576 utime_t deadline;
577 ScrubJob() : cct(nullptr) {}
578 explicit ScrubJob(CephContext* cct, const spg_t& pg,
579 const utime_t& timestamp,
580 double pool_scrub_min_interval = 0,
581 double pool_scrub_max_interval = 0, bool must = true);
582 /// order the jobs by sched_time
583 bool operator<(const ScrubJob& rhs) const;
584 };
585 set<ScrubJob> sched_scrub_pg;
586
587 /// @returns the scrub_reg_stamp used for unregister the scrub job
588 utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval,
589 double pool_scrub_max_interval, bool must) {
590 ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
591 must);
592 Mutex::Locker l(sched_scrub_lock);
593 sched_scrub_pg.insert(scrub);
594 return scrub.sched_time;
595 }
596 void unreg_pg_scrub(spg_t pgid, utime_t t) {
597 Mutex::Locker l(sched_scrub_lock);
598 size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
599 assert(removed);
600 }
601 bool first_scrub_stamp(ScrubJob *out) {
602 Mutex::Locker l(sched_scrub_lock);
603 if (sched_scrub_pg.empty())
604 return false;
605 set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
606 *out = *iter;
607 return true;
608 }
609 bool next_scrub_stamp(const ScrubJob& next,
610 ScrubJob *out) {
611 Mutex::Locker l(sched_scrub_lock);
612 if (sched_scrub_pg.empty())
613 return false;
614 set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next);
615 if (iter == sched_scrub_pg.cend())
616 return false;
617 ++iter;
618 if (iter == sched_scrub_pg.cend())
619 return false;
620 *out = *iter;
621 return true;
622 }
623
624 void dumps_scrub(Formatter *f) {
625 assert(f != nullptr);
626 Mutex::Locker l(sched_scrub_lock);
627
628 f->open_array_section("scrubs");
629 for (const auto &i: sched_scrub_pg) {
630 f->open_object_section("scrub");
631 f->dump_stream("pgid") << i.pgid;
632 f->dump_stream("sched_time") << i.sched_time;
633 f->dump_stream("deadline") << i.deadline;
634 f->dump_bool("forced", i.sched_time == i.deadline);
635 f->close_section();
636 }
637 f->close_section();
638 }
639
640 bool can_inc_scrubs_pending();
641 bool inc_scrubs_pending();
642 void inc_scrubs_active(bool reserved);
643 void dec_scrubs_pending();
644 void dec_scrubs_active();
645
646 void reply_op_error(OpRequestRef op, int err);
647 void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
648 void handle_misdirected_op(PG *pg, OpRequestRef op);
649
650
651private:
652 // -- agent shared state --
653 Mutex agent_lock;
654 Cond agent_cond;
655 map<uint64_t, set<PGRef> > agent_queue;
656 set<PGRef>::iterator agent_queue_pos;
657 bool agent_valid_iterator;
658 int agent_ops;
659 int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
660 set<hobject_t> agent_oids;
661 bool agent_active;
662 struct AgentThread : public Thread {
663 OSDService *osd;
664 explicit AgentThread(OSDService *o) : osd(o) {}
665 void *entry() override {
666 osd->agent_entry();
667 return NULL;
668 }
669 } agent_thread;
670 bool agent_stop_flag;
671 Mutex agent_timer_lock;
672 SafeTimer agent_timer;
673
674public:
675 void agent_entry();
676 void agent_stop();
677
678 void _enqueue(PG *pg, uint64_t priority) {
679 if (!agent_queue.empty() &&
680 agent_queue.rbegin()->first < priority)
681 agent_valid_iterator = false; // inserting higher-priority queue
682 set<PGRef>& nq = agent_queue[priority];
683 if (nq.empty())
684 agent_cond.Signal();
685 nq.insert(pg);
686 }
687
688 void _dequeue(PG *pg, uint64_t old_priority) {
689 set<PGRef>& oq = agent_queue[old_priority];
690 set<PGRef>::iterator p = oq.find(pg);
691 assert(p != oq.end());
692 if (p == agent_queue_pos)
693 ++agent_queue_pos;
694 oq.erase(p);
695 if (oq.empty()) {
696 if (agent_queue.rbegin()->first == old_priority)
697 agent_valid_iterator = false;
698 agent_queue.erase(old_priority);
699 }
700 }
701
702 /// enable agent for a pg
703 void agent_enable_pg(PG *pg, uint64_t priority) {
704 Mutex::Locker l(agent_lock);
705 _enqueue(pg, priority);
706 }
707
708 /// adjust priority for an enagled pg
709 void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
710 Mutex::Locker l(agent_lock);
711 assert(new_priority != old_priority);
712 _enqueue(pg, new_priority);
713 _dequeue(pg, old_priority);
714 }
715
716 /// disable agent for a pg
717 void agent_disable_pg(PG *pg, uint64_t old_priority) {
718 Mutex::Locker l(agent_lock);
719 _dequeue(pg, old_priority);
720 }
721
722 /// note start of an async (evict) op
723 void agent_start_evict_op() {
724 Mutex::Locker l(agent_lock);
725 ++agent_ops;
726 }
727
728 /// note finish or cancellation of an async (evict) op
729 void agent_finish_evict_op() {
730 Mutex::Locker l(agent_lock);
731 assert(agent_ops > 0);
732 --agent_ops;
733 agent_cond.Signal();
734 }
735
736 /// note start of an async (flush) op
737 void agent_start_op(const hobject_t& oid) {
738 Mutex::Locker l(agent_lock);
739 ++agent_ops;
740 assert(agent_oids.count(oid) == 0);
741 agent_oids.insert(oid);
742 }
743
744 /// note finish or cancellation of an async (flush) op
745 void agent_finish_op(const hobject_t& oid) {
746 Mutex::Locker l(agent_lock);
747 assert(agent_ops > 0);
748 --agent_ops;
749 assert(agent_oids.count(oid) == 1);
750 agent_oids.erase(oid);
751 agent_cond.Signal();
752 }
753
754 /// check if we are operating on an object
755 bool agent_is_active_oid(const hobject_t& oid) {
756 Mutex::Locker l(agent_lock);
757 return agent_oids.count(oid);
758 }
759
760 /// get count of active agent ops
761 int agent_get_num_ops() {
762 Mutex::Locker l(agent_lock);
763 return agent_ops;
764 }
765
766 void agent_inc_high_count() {
767 Mutex::Locker l(agent_lock);
768 flush_mode_high_count ++;
769 }
770
771 void agent_dec_high_count() {
772 Mutex::Locker l(agent_lock);
773 flush_mode_high_count --;
774 }
775
776private:
777 /// throttle promotion attempts
778 std::atomic_uint promote_probability_millis{1000}; ///< probability thousands. one word.
779 PromoteCounter promote_counter;
780 utime_t last_recalibrate;
781 unsigned long promote_max_objects, promote_max_bytes;
782
783public:
784 bool promote_throttle() {
785 // NOTE: lockless! we rely on the probability being a single word.
786 promote_counter.attempt();
787 if ((unsigned)rand() % 1000 > promote_probability_millis)
788 return true; // yes throttle (no promote)
789 if (promote_max_objects &&
790 promote_counter.objects > promote_max_objects)
791 return true; // yes throttle
792 if (promote_max_bytes &&
793 promote_counter.bytes > promote_max_bytes)
794 return true; // yes throttle
795 return false; // no throttle (promote)
796 }
797 void promote_finish(uint64_t bytes) {
798 promote_counter.finish(bytes);
799 }
800 void promote_throttle_recalibrate();
801
802 // -- Objecter, for tiering reads/writes from/to other OSDs --
803 Objecter *objecter;
804 Finisher objecter_finisher;
805
806 // -- Watch --
807 Mutex watch_lock;
808 SafeTimer watch_timer;
809 uint64_t next_notif_id;
810 uint64_t get_next_id(epoch_t cur_epoch) {
811 Mutex::Locker l(watch_lock);
812 return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
813 }
814
815 // -- Recovery/Backfill Request Scheduling --
816 Mutex recovery_request_lock;
817 SafeTimer recovery_request_timer;
818
31f18b77
FG
819 // For async recovery sleep
820 bool recovery_needs_sleep = true;
821 utime_t recovery_schedule_time = utime_t();
822
823 Mutex recovery_sleep_lock;
824 SafeTimer recovery_sleep_timer;
825
7c673cae
FG
826 // -- tids --
827 // for ops i issue
828 std::atomic_uint last_tid{0};
829 ceph_tid_t get_tid() {
830 return (ceph_tid_t)last_tid++;
831 }
832
833 // -- backfill_reservation --
834 Finisher reserver_finisher;
835 AsyncReserver<spg_t> local_reserver;
836 AsyncReserver<spg_t> remote_reserver;
837
838 // -- pg_temp --
839private:
840 Mutex pg_temp_lock;
94b18763
FG
841 struct pg_temp_t {
842 pg_temp_t()
843 {}
844 pg_temp_t(vector<int> v, bool f)
845 : acting{v}, forced{f}
846 {}
847 vector<int> acting;
848 bool forced = false;
849 };
850 map<pg_t, pg_temp_t> pg_temp_wanted;
851 map<pg_t, pg_temp_t> pg_temp_pending;
7c673cae 852 void _sent_pg_temp();
94b18763 853 friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
7c673cae 854public:
94b18763
FG
855 void queue_want_pg_temp(pg_t pgid, const vector<int>& want,
856 bool forced = false);
7c673cae
FG
857 void remove_want_pg_temp(pg_t pgid);
858 void requeue_pg_temp();
859 void send_pg_temp();
860
861 void send_pg_created(pg_t pgid);
862
863 void queue_for_peering(PG *pg);
864
865 Mutex snap_sleep_lock;
866 SafeTimer snap_sleep_timer;
867
31f18b77
FG
868 Mutex scrub_sleep_lock;
869 SafeTimer scrub_sleep_timer;
870
7c673cae
FG
871 AsyncReserver<spg_t> snap_reserver;
872 void queue_for_snap_trim(PG *pg);
873
31f18b77
FG
874 void queue_for_scrub(PG *pg, bool with_high_priority) {
875 unsigned scrub_queue_priority = pg->scrubber.priority;
876 if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
877 scrub_queue_priority = cct->_conf->osd_client_op_priority;
878 }
7c673cae
FG
879 enqueue_back(
880 pg->info.pgid,
881 PGQueueable(
882 PGScrub(pg->get_osdmap()->get_epoch()),
883 cct->_conf->osd_scrub_cost,
31f18b77 884 scrub_queue_priority,
7c673cae
FG
885 ceph_clock_now(),
886 entity_inst_t(),
887 pg->get_osdmap()->get_epoch()));
888 }
889
890private:
891 // -- pg recovery and associated throttling --
892 Mutex recovery_lock;
893 list<pair<epoch_t, PGRef> > awaiting_throttle;
894
895 utime_t defer_recovery_until;
896 uint64_t recovery_ops_active;
897 uint64_t recovery_ops_reserved;
898 bool recovery_paused;
899#ifdef DEBUG_RECOVERY_OIDS
900 map<spg_t, set<hobject_t> > recovery_oids;
901#endif
902 bool _recover_now(uint64_t *available_pushes);
903 void _maybe_queue_recovery();
904 void _queue_for_recovery(
905 pair<epoch_t, PGRef> p, uint64_t reserved_pushes) {
906 assert(recovery_lock.is_locked_by_me());
907 enqueue_back(
908 p.second->info.pgid,
909 PGQueueable(
910 PGRecovery(p.first, reserved_pushes),
911 cct->_conf->osd_recovery_cost,
912 cct->_conf->osd_recovery_priority,
913 ceph_clock_now(),
914 entity_inst_t(),
915 p.first));
916 }
917public:
918 void start_recovery_op(PG *pg, const hobject_t& soid);
919 void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
920 bool is_recovery_active();
921 void release_reserved_pushes(uint64_t pushes) {
922 Mutex::Locker l(recovery_lock);
923 assert(recovery_ops_reserved >= pushes);
924 recovery_ops_reserved -= pushes;
925 _maybe_queue_recovery();
926 }
927 void defer_recovery(float defer_for) {
928 defer_recovery_until = ceph_clock_now();
929 defer_recovery_until += defer_for;
930 }
931 void pause_recovery() {
932 Mutex::Locker l(recovery_lock);
933 recovery_paused = true;
934 }
935 bool recovery_is_paused() {
936 Mutex::Locker l(recovery_lock);
937 return recovery_paused;
938 }
939 void unpause_recovery() {
940 Mutex::Locker l(recovery_lock);
941 recovery_paused = false;
942 _maybe_queue_recovery();
943 }
944 void kick_recovery_queue() {
945 Mutex::Locker l(recovery_lock);
946 _maybe_queue_recovery();
947 }
948 void clear_queued_recovery(PG *pg) {
949 Mutex::Locker l(recovery_lock);
950 for (list<pair<epoch_t, PGRef> >::iterator i = awaiting_throttle.begin();
951 i != awaiting_throttle.end();
952 ) {
953 if (i->second.get() == pg) {
954 awaiting_throttle.erase(i);
955 return;
956 } else {
957 ++i;
958 }
959 }
960 }
961 // delayed pg activation
c07f9fc5 962 void queue_for_recovery(PG *pg) {
7c673cae 963 Mutex::Locker l(recovery_lock);
c07f9fc5
FG
964
965 if (pg->get_state() & (PG_STATE_FORCED_RECOVERY | PG_STATE_FORCED_BACKFILL)) {
7c673cae
FG
966 awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
967 } else {
968 awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
969 }
970 _maybe_queue_recovery();
971 }
31f18b77
FG
972 void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
973 Mutex::Locker l(recovery_lock);
974 _queue_for_recovery(make_pair(queued, pg), reserved_pushes);
975 }
7c673cae 976
d2e6a577 977 void adjust_pg_priorities(const vector<PGRef>& pgs, int newflags);
7c673cae
FG
978
979 // osd map cache (past osd maps)
980 Mutex map_cache_lock;
981 SharedLRU<epoch_t, const OSDMap> map_cache;
982 SimpleLRU<epoch_t, bufferlist> map_bl_cache;
983 SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
984
985 OSDMapRef try_get_map(epoch_t e);
986 OSDMapRef get_map(epoch_t e) {
987 OSDMapRef ret(try_get_map(e));
988 assert(ret);
989 return ret;
990 }
991 OSDMapRef add_map(OSDMap *o) {
992 Mutex::Locker l(map_cache_lock);
993 return _add_map(o);
994 }
995 OSDMapRef _add_map(OSDMap *o);
996
997 void add_map_bl(epoch_t e, bufferlist& bl) {
998 Mutex::Locker l(map_cache_lock);
999 return _add_map_bl(e, bl);
1000 }
1001 void pin_map_bl(epoch_t e, bufferlist &bl);
1002 void _add_map_bl(epoch_t e, bufferlist& bl);
1003 bool get_map_bl(epoch_t e, bufferlist& bl) {
1004 Mutex::Locker l(map_cache_lock);
1005 return _get_map_bl(e, bl);
1006 }
1007 bool _get_map_bl(epoch_t e, bufferlist& bl);
1008
1009 void add_map_inc_bl(epoch_t e, bufferlist& bl) {
1010 Mutex::Locker l(map_cache_lock);
1011 return _add_map_inc_bl(e, bl);
1012 }
1013 void pin_map_inc_bl(epoch_t e, bufferlist &bl);
1014 void _add_map_inc_bl(epoch_t e, bufferlist& bl);
1015 bool get_inc_map_bl(epoch_t e, bufferlist& bl);
1016
1017 void clear_map_bl_cache_pins(epoch_t e);
1018
1019 void need_heartbeat_peer_update();
1020
1021 void pg_stat_queue_enqueue(PG *pg);
1022 void pg_stat_queue_dequeue(PG *pg);
1023
1024 void init();
1025 void final_init();
1026 void start_shutdown();
31f18b77 1027 void shutdown_reserver();
7c673cae
FG
1028 void shutdown();
1029
1030private:
1031 // split
1032 Mutex in_progress_split_lock;
1033 map<spg_t, spg_t> pending_splits; // child -> parent
1034 map<spg_t, set<spg_t> > rev_pending_splits; // parent -> [children]
1035 set<spg_t> in_progress_splits; // child
1036
1037public:
1038 void _start_split(spg_t parent, const set<spg_t> &children);
1039 void start_split(spg_t parent, const set<spg_t> &children) {
1040 Mutex::Locker l(in_progress_split_lock);
1041 return _start_split(parent, children);
1042 }
1043 void mark_split_in_progress(spg_t parent, const set<spg_t> &pgs);
1044 void complete_split(const set<spg_t> &pgs);
1045 void cancel_pending_splits_for_parent(spg_t parent);
1046 void _cancel_pending_splits_for_parent(spg_t parent);
1047 bool splitting(spg_t pgid);
1048 void expand_pg_num(OSDMapRef old_map,
1049 OSDMapRef new_map);
1050 void _maybe_split_pgid(OSDMapRef old_map,
1051 OSDMapRef new_map,
1052 spg_t pgid);
1053 void init_splits_between(spg_t pgid, OSDMapRef frommap, OSDMapRef tomap);
1054
1055 // -- stats --
1056 Mutex stat_lock;
1057 osd_stat_t osd_stat;
31f18b77 1058 uint32_t seq = 0;
7c673cae
FG
1059
1060 void update_osd_stat(vector<int>& hb_peers);
224ce89b 1061 osd_stat_t set_osd_stat(const struct store_statfs_t &stbuf,
35e4c445
FG
1062 vector<int>& hb_peers,
1063 int num_pgs);
7c673cae
FG
1064 osd_stat_t get_osd_stat() {
1065 Mutex::Locker l(stat_lock);
31f18b77
FG
1066 ++seq;
1067 osd_stat.up_from = up_epoch;
1068 osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
7c673cae
FG
1069 return osd_stat;
1070 }
31f18b77
FG
1071 uint64_t get_osd_stat_seq() {
1072 Mutex::Locker l(stat_lock);
1073 return osd_stat.seq;
1074 }
7c673cae
FG
1075
1076 // -- OSD Full Status --
1077private:
1078 friend TestOpsSocketHook;
1079 mutable Mutex full_status_lock;
1080 enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
1081 const char *get_full_state_name(s_names s) const {
1082 switch (s) {
1083 case NONE: return "none";
1084 case NEARFULL: return "nearfull";
1085 case BACKFILLFULL: return "backfillfull";
1086 case FULL: return "full";
1087 case FAILSAFE: return "failsafe";
1088 default: return "???";
1089 }
1090 }
1091 s_names get_full_state(string type) const {
1092 if (type == "none")
1093 return NONE;
1094 else if (type == "failsafe")
1095 return FAILSAFE;
1096 else if (type == "full")
1097 return FULL;
1098 else if (type == "backfillfull")
1099 return BACKFILLFULL;
1100 else if (type == "nearfull")
1101 return NEARFULL;
1102 else
1103 return INVALID;
1104 }
1105 double cur_ratio; ///< current utilization
1106 mutable int64_t injectfull = 0;
1107 s_names injectfull_state = NONE;
1108 float get_failsafe_full_ratio();
224ce89b 1109 void check_full_status(float ratio);
7c673cae
FG
1110 bool _check_full(s_names type, ostream &ss) const;
1111public:
1112 bool check_failsafe_full(ostream &ss) const;
1113 bool check_full(ostream &ss) const;
1114 bool check_backfill_full(ostream &ss) const;
1115 bool check_nearfull(ostream &ss) const;
1116 bool is_failsafe_full() const;
1117 bool is_full() const;
1118 bool is_backfillfull() const;
1119 bool is_nearfull() const;
1120 bool need_fullness_update(); ///< osdmap state needs update
1121 void set_injectfull(s_names type, int64_t count);
1122 bool check_osdmap_full(const set<pg_shard_t> &missing_on);
1123
1124
1125 // -- epochs --
1126private:
1127 mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch
1128 epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
1129 epoch_t up_epoch; // _most_recent_ epoch we were marked up
1130 epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
1131public:
1132 /**
1133 * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params
1134 * can be NULL if you don't care about them.
1135 */
1136 void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
1137 epoch_t *_bind_epoch) const;
1138 /**
1139 * Set the boot, up, and bind epochs. Any NULL params will not be set.
1140 */
1141 void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
1142 const epoch_t *_bind_epoch);
1143 epoch_t get_boot_epoch() const {
1144 epoch_t ret;
1145 retrieve_epochs(&ret, NULL, NULL);
1146 return ret;
1147 }
1148 epoch_t get_up_epoch() const {
1149 epoch_t ret;
1150 retrieve_epochs(NULL, &ret, NULL);
1151 return ret;
1152 }
1153 epoch_t get_bind_epoch() const {
1154 epoch_t ret;
1155 retrieve_epochs(NULL, NULL, &ret);
1156 return ret;
1157 }
1158
181888fb
FG
1159 void request_osdmap_update(epoch_t e);
1160
7c673cae
FG
1161 // -- stopping --
1162 Mutex is_stopping_lock;
1163 Cond is_stopping_cond;
1164 enum {
1165 NOT_STOPPING,
1166 PREPARING_TO_STOP,
1167 STOPPING };
1168 std::atomic_int state{NOT_STOPPING};
1169 int get_state() {
1170 return state;
1171 }
1172 void set_state(int s) {
1173 state = s;
1174 }
1175 bool is_stopping() const {
1176 return state == STOPPING;
1177 }
1178 bool is_preparing_to_stop() const {
1179 return state == PREPARING_TO_STOP;
1180 }
1181 bool prepare_to_stop();
1182 void got_stop_ack();
1183
1184
1185#ifdef PG_DEBUG_REFS
1186 Mutex pgid_lock;
1187 map<spg_t, int> pgid_tracker;
1188 map<spg_t, PG*> live_pgs;
31f18b77
FG
1189 void add_pgid(spg_t pgid, PG *pg);
1190 void remove_pgid(spg_t pgid, PG *pg);
1191 void dump_live_pgids();
7c673cae
FG
1192#endif
1193
1194 explicit OSDService(OSD *osd);
1195 ~OSDService();
1196};
1197
1198class OSD : public Dispatcher,
1199 public md_config_obs_t {
1200 /** OSD **/
1201 Mutex osd_lock; // global lock
1202 SafeTimer tick_timer; // safe timer (osd_lock)
1203
1204 // Tick timer for those stuff that do not need osd_lock
1205 Mutex tick_timer_lock;
1206 SafeTimer tick_timer_without_osd_lock;
1207public:
1208 // config observer bits
1209 const char** get_tracked_conf_keys() const override;
1210 void handle_conf_change(const struct md_config_t *conf,
1211 const std::set <std::string> &changed) override;
1212 void update_log_config();
1213 void check_config();
1214
1215protected:
1216
1217 static const double OSD_TICK_INTERVAL; // tick interval for tick_timer and tick_timer_without_osd_lock
1218
1219 AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry;
1220 AuthAuthorizeHandlerRegistry *authorize_handler_service_registry;
1221
1222 Messenger *cluster_messenger;
1223 Messenger *client_messenger;
1224 Messenger *objecter_messenger;
1225 MonClient *monc; // check the "monc helpers" list before accessing directly
1226 MgrClient mgrc;
1227 PerfCounters *logger;
1228 PerfCounters *recoverystate_perf;
1229 ObjectStore *store;
1230#ifdef HAVE_LIBFUSE
1231 FuseStore *fuse_store = nullptr;
1232#endif
1233 LogClient log_client;
1234 LogChannelRef clog;
1235
1236 int whoami;
1237 std::string dev_path, journal_path;
1238
31f18b77 1239 bool store_is_rotational = true;
d2e6a577 1240 bool journal_is_rotational = true;
31f18b77 1241
7c673cae
FG
1242 ZTracer::Endpoint trace_endpoint;
1243 void create_logger();
1244 void create_recoverystate_perf();
1245 void tick();
1246 void tick_without_osd_lock();
1247 void _dispatch(Message *m);
1248 void dispatch_op(OpRequestRef op);
1249
1250 void check_osdmap_features(ObjectStore *store);
1251
1252 // asok
1253 friend class OSDSocketHook;
1254 class OSDSocketHook *asok_hook;
1255 bool asok_command(string admin_command, cmdmap_t& cmdmap, string format, ostream& ss);
1256
1257public:
1258 ClassHandler *class_handler = nullptr;
1259 int get_nodeid() { return whoami; }
1260
1261 static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
1262 char foo[20];
1263 snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
1264 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1265 }
1266 static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
1267 char foo[22];
1268 snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch);
1269 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1270 }
1271
1272 static ghobject_t make_snapmapper_oid() {
1273 return ghobject_t(hobject_t(
1274 sobject_t(
1275 object_t("snapmapper"),
1276 0)));
1277 }
1278
1279 static ghobject_t make_pg_log_oid(spg_t pg) {
1280 stringstream ss;
1281 ss << "pglog_" << pg;
1282 string s;
1283 getline(ss, s);
1284 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1285 }
1286
1287 static ghobject_t make_pg_biginfo_oid(spg_t pg) {
1288 stringstream ss;
1289 ss << "pginfo_" << pg;
1290 string s;
1291 getline(ss, s);
1292 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1293 }
1294 static ghobject_t make_infos_oid() {
1295 hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
1296 return ghobject_t(oid);
1297 }
1298 static void recursive_remove_collection(CephContext* cct,
1299 ObjectStore *store,
1300 spg_t pgid,
1301 coll_t tmp);
1302
1303 /**
1304 * get_osd_initial_compat_set()
1305 *
1306 * Get the initial feature set for this OSD. Features
1307 * here are automatically upgraded.
1308 *
1309 * Return value: Initial osd CompatSet
1310 */
1311 static CompatSet get_osd_initial_compat_set();
1312
1313 /**
1314 * get_osd_compat_set()
1315 *
1316 * Get all features supported by this OSD
1317 *
1318 * Return value: CompatSet of all supported features
1319 */
1320 static CompatSet get_osd_compat_set();
1321
1322
1323private:
1324 class C_Tick;
1325 class C_Tick_WithoutOSDLock;
1326
1327 // -- superblock --
1328 OSDSuperblock superblock;
1329
1330 void write_superblock();
1331 void write_superblock(ObjectStore::Transaction& t);
1332 int read_superblock();
1333
1334 void clear_temp_objects();
1335
1336 CompatSet osd_compat;
1337
1338 // -- state --
1339public:
1340 typedef enum {
1341 STATE_INITIALIZING = 1,
1342 STATE_PREBOOT,
1343 STATE_BOOTING,
1344 STATE_ACTIVE,
1345 STATE_STOPPING,
1346 STATE_WAITING_FOR_HEALTHY
1347 } osd_state_t;
1348
1349 static const char *get_state_name(int s) {
1350 switch (s) {
1351 case STATE_INITIALIZING: return "initializing";
1352 case STATE_PREBOOT: return "preboot";
1353 case STATE_BOOTING: return "booting";
1354 case STATE_ACTIVE: return "active";
1355 case STATE_STOPPING: return "stopping";
1356 case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy";
1357 default: return "???";
1358 }
1359 }
1360
1361private:
1362 std::atomic_int state{STATE_INITIALIZING};
224ce89b 1363 bool waiting_for_luminous_mons = false;
7c673cae
FG
1364
1365public:
1366 int get_state() const {
1367 return state;
1368 }
1369 void set_state(int s) {
1370 state = s;
1371 }
1372 bool is_initializing() const {
1373 return state == STATE_INITIALIZING;
1374 }
1375 bool is_preboot() const {
1376 return state == STATE_PREBOOT;
1377 }
1378 bool is_booting() const {
1379 return state == STATE_BOOTING;
1380 }
1381 bool is_active() const {
1382 return state == STATE_ACTIVE;
1383 }
1384 bool is_stopping() const {
1385 return state == STATE_STOPPING;
1386 }
1387 bool is_waiting_for_healthy() const {
1388 return state == STATE_WAITING_FOR_HEALTHY;
1389 }
1390
1391private:
1392
31f18b77 1393 ThreadPool peering_tp;
7c673cae
FG
1394 ShardedThreadPool osd_op_tp;
1395 ThreadPool disk_tp;
1396 ThreadPool command_tp;
1397
1398 void set_disk_tp_priority();
1399 void get_latest_osdmap();
1400
1401 // -- sessions --
1402private:
1403 void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
1404 void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
1405
1406 Mutex session_waiting_lock;
1407 set<Session*> session_waiting_for_map;
1408
1409 /// Caller assumes refs for included Sessions
1410 void get_sessions_waiting_for_map(set<Session*> *out) {
1411 Mutex::Locker l(session_waiting_lock);
1412 out->swap(session_waiting_for_map);
1413 }
1414 void register_session_waiting_on_map(Session *session) {
1415 Mutex::Locker l(session_waiting_lock);
1416 if (session_waiting_for_map.insert(session).second) {
1417 session->get();
1418 }
1419 }
1420 void clear_session_waiting_on_map(Session *session) {
1421 Mutex::Locker l(session_waiting_lock);
1422 set<Session*>::iterator i = session_waiting_for_map.find(session);
1423 if (i != session_waiting_for_map.end()) {
1424 (*i)->put();
1425 session_waiting_for_map.erase(i);
1426 }
1427 }
1428 void dispatch_sessions_waiting_on_map() {
1429 set<Session*> sessions_to_check;
1430 get_sessions_waiting_for_map(&sessions_to_check);
1431 for (set<Session*>::iterator i = sessions_to_check.begin();
1432 i != sessions_to_check.end();
1433 sessions_to_check.erase(i++)) {
1434 (*i)->session_dispatch_lock.Lock();
1435 dispatch_session_waiting(*i, osdmap);
1436 (*i)->session_dispatch_lock.Unlock();
1437 (*i)->put();
1438 }
1439 }
1440 void session_handle_reset(Session *session) {
1441 Mutex::Locker l(session->session_dispatch_lock);
1442 clear_session_waiting_on_map(session);
1443
1444 session->clear_backoffs();
1445
1446 /* Messages have connection refs, we need to clear the
1447 * connection->session->message->connection
1448 * cycles which result.
1449 * Bug #12338
1450 */
1451 session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
1452 }
1453
1454private:
1455 /**
1456 * @defgroup monc helpers
1457 * @{
1458 * Right now we only have the one
1459 */
1460
1461 /**
1462 * Ask the Monitors for a sequence of OSDMaps.
1463 *
1464 * @param epoch The epoch to start with when replying
1465 * @param force_request True if this request forces a new subscription to
1466 * the monitors; false if an outstanding request that encompasses it is
1467 * sufficient.
1468 */
1469 void osdmap_subscribe(version_t epoch, bool force_request);
1470 /** @} monc helpers */
1471
181888fb
FG
1472 Mutex osdmap_subscribe_lock;
1473 epoch_t latest_subscribed_epoch{0};
1474
7c673cae
FG
1475 // -- heartbeat --
1476 /// information about a heartbeat peer
1477 struct HeartbeatInfo {
1478 int peer; ///< peer
1479 ConnectionRef con_front; ///< peer connection (front)
1480 ConnectionRef con_back; ///< peer connection (back)
1481 utime_t first_tx; ///< time we sent our first ping request
1482 utime_t last_tx; ///< last time we sent a ping request
1483 utime_t last_rx_front; ///< last time we got a ping reply on the front side
1484 utime_t last_rx_back; ///< last time we got a ping reply on the back side
1485 epoch_t epoch; ///< most recent epoch we wanted this peer
1486
1487 bool is_unhealthy(utime_t cutoff) const {
1488 return
1489 ! ((last_rx_front > cutoff ||
1490 (last_rx_front == utime_t() && (last_tx == utime_t() ||
1491 first_tx > cutoff))) &&
1492 (last_rx_back > cutoff ||
1493 (last_rx_back == utime_t() && (last_tx == utime_t() ||
1494 first_tx > cutoff))));
1495 }
1496 bool is_healthy(utime_t cutoff) const {
1497 return last_rx_front > cutoff && last_rx_back > cutoff;
1498 }
1499
1500 };
1501 /// state attached to outgoing heartbeat connections
1502 struct HeartbeatSession : public RefCountedObject {
1503 int peer;
1504 explicit HeartbeatSession(int p) : peer(p) {}
1505 };
1506 Mutex heartbeat_lock;
1507 map<int, int> debug_heartbeat_drops_remaining;
1508 Cond heartbeat_cond;
1509 bool heartbeat_stop;
1510 std::atomic_bool heartbeat_need_update;
1511 map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
1512 utime_t last_mon_heartbeat;
1513 Messenger *hb_front_client_messenger;
1514 Messenger *hb_back_client_messenger;
1515 Messenger *hb_front_server_messenger;
1516 Messenger *hb_back_server_messenger;
1517 utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
1518 double daily_loadavg;
1519
1520 void _add_heartbeat_peer(int p);
1521 void _remove_heartbeat_peer(int p);
1522 bool heartbeat_reset(Connection *con);
1523 void maybe_update_heartbeat_peers();
1524 void reset_heartbeat_peers();
1525 bool heartbeat_peers_need_update() {
1526 return heartbeat_need_update.load();
1527 }
1528 void heartbeat_set_peers_need_update() {
1529 heartbeat_need_update.store(true);
1530 }
1531 void heartbeat_clear_peers_need_update() {
1532 heartbeat_need_update.store(false);
1533 }
1534 void heartbeat();
1535 void heartbeat_check();
1536 void heartbeat_entry();
1537 void need_heartbeat_peer_update();
1538
1539 void heartbeat_kick() {
1540 Mutex::Locker l(heartbeat_lock);
1541 heartbeat_cond.Signal();
1542 }
1543
1544 struct T_Heartbeat : public Thread {
1545 OSD *osd;
1546 explicit T_Heartbeat(OSD *o) : osd(o) {}
1547 void *entry() override {
1548 osd->heartbeat_entry();
1549 return 0;
1550 }
1551 } heartbeat_thread;
1552
1553public:
1554 bool heartbeat_dispatch(Message *m);
1555
1556 struct HeartbeatDispatcher : public Dispatcher {
1557 OSD *osd;
1558 explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
1559
1560 bool ms_can_fast_dispatch_any() const override { return true; }
1561 bool ms_can_fast_dispatch(const Message *m) const override {
1562 switch (m->get_type()) {
1563 case CEPH_MSG_PING:
1564 case MSG_OSD_PING:
1565 return true;
1566 default:
1567 return false;
1568 }
1569 }
1570 void ms_fast_dispatch(Message *m) override {
1571 osd->heartbeat_dispatch(m);
1572 }
1573 bool ms_dispatch(Message *m) override {
1574 return osd->heartbeat_dispatch(m);
1575 }
1576 bool ms_handle_reset(Connection *con) override {
1577 return osd->heartbeat_reset(con);
1578 }
1579 void ms_handle_remote_reset(Connection *con) override {}
1580 bool ms_handle_refused(Connection *con) override {
1581 return osd->ms_handle_refused(con);
1582 }
1583 bool ms_verify_authorizer(Connection *con, int peer_type,
1584 int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
28e407b8
AA
1585 bool& isvalid, CryptoKey& session_key,
1586 std::unique_ptr<AuthAuthorizerChallenge> *challenge) override {
7c673cae
FG
1587 isvalid = true;
1588 return true;
1589 }
1590 } heartbeat_dispatcher;
1591
1592private:
1593 // -- waiters --
1594 list<OpRequestRef> finished;
1595
1596 void take_waiters(list<OpRequestRef>& ls) {
1597 assert(osd_lock.is_locked());
1598 finished.splice(finished.end(), ls);
1599 }
1600 void do_waiters();
1601
1602 // -- op tracking --
1603 OpTracker op_tracker;
1604 void check_ops_in_flight();
1605 void test_ops(std::string command, std::string args, ostream& ss);
1606 friend class TestOpsSocketHook;
1607 TestOpsSocketHook *test_ops_hook;
1608 friend struct C_CompleteSplits;
1609 friend struct C_OpenPGs;
1610
1611 // -- op queue --
224ce89b 1612 enum class io_queue {
7c673cae 1613 prioritized,
224ce89b
WB
1614 weightedpriority,
1615 mclock_opclass,
1616 mclock_client,
7c673cae 1617 };
224ce89b
WB
1618 friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
1619
7c673cae
FG
1620 const io_queue op_queue;
1621 const unsigned int op_prio_cutoff;
1622
1623 /*
1624 * The ordered op delivery chain is:
1625 *
1626 * fast dispatch -> pqueue back
1627 * pqueue front <-> to_process back
1628 * to_process front -> RunVis(item)
1629 * <- queue_front()
1630 *
1631 * The pqueue is per-shard, and to_process is per pg_slot. Items can be
1632 * pushed back up into to_process and/or pqueue while order is preserved.
1633 *
1634 * Multiple worker threads can operate on each shard.
1635 *
1636 * Under normal circumstances, num_running == to_proces.size(). There are
1637 * two times when that is not true: (1) when waiting_for_pg == true and
1638 * to_process is accumulating requests that are waiting for the pg to be
1639 * instantiated; in that case they will all get requeued together by
1640 * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
1641 * and already requeued the items.
1642 */
1643 friend class PGQueueable;
224ce89b 1644
7c673cae
FG
1645 class ShardedOpWQ
1646 : public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>
1647 {
1648 struct ShardData {
1649 Mutex sdata_lock;
1650 Cond sdata_cond;
1651
1652 Mutex sdata_op_ordering_lock; ///< protects all members below
1653
1654 OSDMapRef waiting_for_pg_osdmap;
1655 struct pg_slot {
1656 PGRef pg; ///< cached pg reference [optional]
1657 list<PGQueueable> to_process; ///< order items for this slot
1658 int num_running = 0; ///< _process threads doing pg lookup/lock
1659
1660 /// true if pg does/did not exist. if so all new items go directly to
1661 /// to_process. cleared by prune_pg_waiters.
1662 bool waiting_for_pg = false;
1663
1664 /// incremented by wake_pg_waiters; indicates racing _process threads
1665 /// should bail out (their op has been requeued)
1666 uint64_t requeue_seq = 0;
1667 };
1668
1669 /// map of slots for each spg_t. maintains ordering of items dequeued
1670 /// from pqueue while _process thread drops shard lock to acquire the
1671 /// pg lock. slots are removed only by prune_pg_waiters.
1672 unordered_map<spg_t,pg_slot> pg_slots;
1673
1674 /// priority queue
1675 std::unique_ptr<OpQueue< pair<spg_t, PGQueueable>, entity_inst_t>> pqueue;
1676
1677 void _enqueue_front(pair<spg_t, PGQueueable> item, unsigned cutoff) {
1678 unsigned priority = item.second.get_priority();
1679 unsigned cost = item.second.get_cost();
1680 if (priority >= cutoff)
1681 pqueue->enqueue_strict_front(
1682 item.second.get_owner(),
1683 priority, item);
1684 else
1685 pqueue->enqueue_front(
1686 item.second.get_owner(),
1687 priority, cost, item);
1688 }
1689
1690 ShardData(
1691 string lock_name, string ordering_lock,
1692 uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
1693 io_queue opqueue)
1694 : sdata_lock(lock_name.c_str(), false, true, false, cct),
1695 sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
1696 false, cct) {
224ce89b 1697 if (opqueue == io_queue::weightedpriority) {
7c673cae
FG
1698 pqueue = std::unique_ptr
1699 <WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
1700 new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
1701 max_tok_per_prio, min_cost));
224ce89b 1702 } else if (opqueue == io_queue::prioritized) {
7c673cae
FG
1703 pqueue = std::unique_ptr
1704 <PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
1705 new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
1706 max_tok_per_prio, min_cost));
224ce89b
WB
1707 } else if (opqueue == io_queue::mclock_opclass) {
1708 pqueue = std::unique_ptr
1709 <ceph::mClockOpClassQueue>(new ceph::mClockOpClassQueue(cct));
1710 } else if (opqueue == io_queue::mclock_client) {
1711 pqueue = std::unique_ptr
1712 <ceph::mClockClientQueue>(new ceph::mClockClientQueue(cct));
7c673cae
FG
1713 }
1714 }
224ce89b 1715 }; // struct ShardData
7c673cae
FG
1716
1717 vector<ShardData*> shard_list;
1718 OSD *osd;
1719 uint32_t num_shards;
1720
1721 public:
1722 ShardedOpWQ(uint32_t pnum_shards,
1723 OSD *o,
1724 time_t ti,
1725 time_t si,
1726 ShardedThreadPool* tp)
1727 : ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>(ti, si, tp),
1728 osd(o),
1729 num_shards(pnum_shards) {
1730 for (uint32_t i = 0; i < num_shards; i++) {
1731 char lock_name[32] = {0};
1732 snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
1733 char order_lock[32] = {0};
1734 snprintf(order_lock, sizeof(order_lock), "%s.%d",
1735 "OSD:ShardedOpWQ:order:", i);
1736 ShardData* one_shard = new ShardData(
1737 lock_name, order_lock,
1738 osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
1739 osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue);
1740 shard_list.push_back(one_shard);
1741 }
1742 }
1743 ~ShardedOpWQ() override {
1744 while (!shard_list.empty()) {
1745 delete shard_list.back();
1746 shard_list.pop_back();
1747 }
1748 }
1749
1750 /// wake any pg waiters after a PG is created/instantiated
1751 void wake_pg_waiters(spg_t pgid);
1752
1753 /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here
1754 void prune_pg_waiters(OSDMapRef osdmap, int whoami);
1755
1756 /// clear cached PGRef on pg deletion
1757 void clear_pg_pointer(spg_t pgid);
1758
1759 /// clear pg_slots on shutdown
1760 void clear_pg_slots();
1761
1762 /// try to do some work
1763 void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
1764
1765 /// enqueue a new item
1766 void _enqueue(pair <spg_t, PGQueueable> item) override;
1767
1768 /// requeue an old item (at the front of the line)
1769 void _enqueue_front(pair <spg_t, PGQueueable> item) override;
1770
1771 void return_waiting_threads() override {
1772 for(uint32_t i = 0; i < num_shards; i++) {
1773 ShardData* sdata = shard_list[i];
1774 assert (NULL != sdata);
1775 sdata->sdata_lock.Lock();
1776 sdata->sdata_cond.Signal();
1777 sdata->sdata_lock.Unlock();
1778 }
1779 }
1780
1781 void dump(Formatter *f) {
1782 for(uint32_t i = 0; i < num_shards; i++) {
1783 ShardData* sdata = shard_list[i];
1784 char lock_name[32] = {0};
1785 snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i);
1786 assert (NULL != sdata);
1787 sdata->sdata_op_ordering_lock.Lock();
1788 f->open_object_section(lock_name);
1789 sdata->pqueue->dump(f);
1790 f->close_section();
1791 sdata->sdata_op_ordering_lock.Unlock();
1792 }
1793 }
1794
1795 /// Must be called on ops queued back to front
1796 struct Pred {
1797 spg_t pgid;
1798 list<OpRequestRef> *out_ops;
1799 uint64_t reserved_pushes_to_free;
1800 Pred(spg_t pg, list<OpRequestRef> *out_ops = 0)
1801 : pgid(pg), out_ops(out_ops), reserved_pushes_to_free(0) {}
1802 void accumulate(const PGQueueable &op) {
1803 reserved_pushes_to_free += op.get_reserved_pushes();
1804 if (out_ops) {
1805 boost::optional<OpRequestRef> mop = op.maybe_get_op();
1806 if (mop)
1807 out_ops->push_front(*mop);
1808 }
1809 }
1810 bool operator()(const pair<spg_t, PGQueueable> &op) {
1811 if (op.first == pgid) {
1812 accumulate(op.second);
1813 return true;
1814 } else {
1815 return false;
1816 }
1817 }
1818 uint64_t get_reserved_pushes_to_free() const {
1819 return reserved_pushes_to_free;
1820 }
1821 };
1822
1823 bool is_shard_empty(uint32_t thread_index) override {
1824 uint32_t shard_index = thread_index % num_shards;
1825 ShardData* sdata = shard_list[shard_index];
1826 assert(NULL != sdata);
1827 Mutex::Locker l(sdata->sdata_op_ordering_lock);
1828 return sdata->pqueue->empty();
1829 }
1830 } op_shardedwq;
1831
1832
1833 void enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch);
1834 void dequeue_op(
1835 PGRef pg, OpRequestRef op,
1836 ThreadPool::TPHandle &handle);
1837
1838 // -- peering queue --
1839 struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
1840 list<PG*> peering_queue;
1841 OSD *osd;
1842 set<PG*> in_use;
1843 PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
1844 : ThreadPool::BatchWorkQueue<PG>(
1845 "OSD::PeeringWQ", ti, si, tp), osd(o) {}
1846
1847 void _dequeue(PG *pg) override {
1848 for (list<PG*>::iterator i = peering_queue.begin();
1849 i != peering_queue.end();
1850 ) {
1851 if (*i == pg) {
1852 peering_queue.erase(i++);
1853 pg->put("PeeringWQ");
1854 } else {
1855 ++i;
1856 }
1857 }
1858 }
1859 bool _enqueue(PG *pg) override {
1860 pg->get("PeeringWQ");
1861 peering_queue.push_back(pg);
1862 return true;
1863 }
1864 bool _empty() override {
1865 return peering_queue.empty();
1866 }
1867 void _dequeue(list<PG*> *out) override;
1868 void _process(
1869 const list<PG *> &pgs,
1870 ThreadPool::TPHandle &handle) override {
1871 assert(!pgs.empty());
1872 osd->process_peering_events(pgs, handle);
1873 for (list<PG *>::const_iterator i = pgs.begin();
1874 i != pgs.end();
1875 ++i) {
1876 (*i)->put("PeeringWQ");
1877 }
1878 }
1879 void _process_finish(const list<PG *> &pgs) override {
1880 for (list<PG*>::const_iterator i = pgs.begin();
1881 i != pgs.end();
1882 ++i) {
1883 in_use.erase(*i);
1884 }
1885 }
1886 void _clear() override {
1887 assert(peering_queue.empty());
1888 }
1889 } peering_wq;
1890
1891 void process_peering_events(
1892 const list<PG*> &pg,
1893 ThreadPool::TPHandle &handle);
1894
1895 friend class PG;
1896 friend class PrimaryLogPG;
1897
1898
1899 protected:
1900
1901 // -- osd map --
1902 OSDMapRef osdmap;
1903 OSDMapRef get_osdmap() {
1904 return osdmap;
1905 }
224ce89b 1906 epoch_t get_osdmap_epoch() const {
7c673cae
FG
1907 return osdmap ? osdmap->get_epoch() : 0;
1908 }
1909
1910 utime_t had_map_since;
1911 RWLock map_lock;
1912 list<OpRequestRef> waiting_for_osdmap;
1913 deque<utime_t> osd_markdown_log;
1914
1915 friend struct send_map_on_destruct;
1916
1917 void wait_for_new_map(OpRequestRef op);
1918 void handle_osd_map(class MOSDMap *m);
1919 void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
1920 void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
1921 void note_down_osd(int osd);
1922 void note_up_osd(int osd);
1923 friend class C_OnMapCommit;
1924
1925 bool advance_pg(
1926 epoch_t advance_to, PG *pg,
1927 ThreadPool::TPHandle &handle,
1928 PG::RecoveryCtx *rctx,
31f18b77 1929 set<PGRef> *split_pgs
7c673cae
FG
1930 );
1931 void consume_map();
1932 void activate_map();
1933
1934 // osd map cache (past osd maps)
1935 OSDMapRef get_map(epoch_t e) {
1936 return service.get_map(e);
1937 }
1938 OSDMapRef add_map(OSDMap *o) {
1939 return service.add_map(o);
1940 }
1941 void add_map_bl(epoch_t e, bufferlist& bl) {
1942 return service.add_map_bl(e, bl);
1943 }
1944 void pin_map_bl(epoch_t e, bufferlist &bl) {
1945 return service.pin_map_bl(e, bl);
1946 }
1947 bool get_map_bl(epoch_t e, bufferlist& bl) {
1948 return service.get_map_bl(e, bl);
1949 }
1950 void add_map_inc_bl(epoch_t e, bufferlist& bl) {
1951 return service.add_map_inc_bl(e, bl);
1952 }
1953 void pin_map_inc_bl(epoch_t e, bufferlist &bl) {
1954 return service.pin_map_inc_bl(e, bl);
1955 }
1956
1957protected:
1958 // -- placement groups --
1959 RWLock pg_map_lock; // this lock orders *above* individual PG _locks
1960 ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock
1961
3efd9988 1962 std::mutex pending_creates_lock;
b32b8144
FG
1963 using create_from_osd_t = std::pair<pg_t, bool /* is primary*/>;
1964 std::set<create_from_osd_t> pending_creates_from_osd;
3efd9988
FG
1965 unsigned pending_creates_from_mon = 0;
1966
7c673cae
FG
1967 map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
1968 PGRecoveryStats pg_recovery_stats;
1969
1970 PGPool _get_pool(int id, OSDMapRef createmap);
1971
1972 PG *_lookup_lock_pg_with_map_lock_held(spg_t pgid);
1973 PG *_lookup_lock_pg(spg_t pgid);
31f18b77
FG
1974
1975public:
1976 PG *lookup_lock_pg(spg_t pgid);
1977
35e4c445
FG
1978 int get_num_pgs() {
1979 RWLock::RLocker l(pg_map_lock);
1980 return pg_map.size();
1981 }
1982
31f18b77 1983protected:
7c673cae
FG
1984 PG *_open_lock_pg(OSDMapRef createmap,
1985 spg_t pg, bool no_lockdep_check=false);
1986 enum res_result {
1987 RES_PARENT, // resurrected a parent
1988 RES_SELF, // resurrected self
1989 RES_NONE // nothing relevant deleting
1990 };
1991 res_result _try_resurrect_pg(
1992 OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state);
1993
1994 PG *_create_lock_pg(
1995 OSDMapRef createmap,
1996 spg_t pgid,
1997 bool hold_map_lock,
1998 bool backfill,
1999 int role,
2000 vector<int>& up, int up_primary,
2001 vector<int>& acting, int acting_primary,
2002 pg_history_t history,
2003 const PastIntervals& pi,
2004 ObjectStore::Transaction& t);
2005
2006 PG* _make_pg(OSDMapRef createmap, spg_t pgid);
2007 void add_newly_split_pg(PG *pg,
2008 PG::RecoveryCtx *rctx);
2009
2010 int handle_pg_peering_evt(
2011 spg_t pgid,
2012 const pg_history_t& orig_history,
2013 const PastIntervals& pi,
2014 epoch_t epoch,
2015 PG::CephPeeringEvtRef evt);
3efd9988
FG
2016 bool maybe_wait_for_max_pg(spg_t pgid, bool is_mon_create);
2017 void resume_creating_pg();
2018
7c673cae
FG
2019 void load_pgs();
2020 void build_past_intervals_parallel();
2021
2022 /// build initial pg history and intervals on create
2023 void build_initial_pg_history(
2024 spg_t pgid,
2025 epoch_t created,
2026 utime_t created_stamp,
2027 pg_history_t *h,
2028 PastIntervals *pi);
2029
2030 /// project pg history from from to now
2031 bool project_pg_history(
2032 spg_t pgid, pg_history_t& h, epoch_t from,
2033 const vector<int>& lastup,
2034 int lastupprimary,
2035 const vector<int>& lastacting,
2036 int lastactingprimary
2037 ); ///< @return false if there was a map gap between from and now
2038
2039 // this must be called with pg->lock held on any pg addition to pg_map
2040 void wake_pg_waiters(PGRef pg) {
2041 assert(pg->is_locked());
2042 op_shardedwq.wake_pg_waiters(pg->info.pgid);
2043 }
2044 epoch_t last_pg_create_epoch;
2045
2046 void handle_pg_create(OpRequestRef op);
2047
2048 void split_pgs(
2049 PG *parent,
31f18b77 2050 const set<spg_t> &childpgids, set<PGRef> *out_pgs,
7c673cae
FG
2051 OSDMapRef curmap,
2052 OSDMapRef nextmap,
2053 PG::RecoveryCtx *rctx);
2054
2055 // == monitor interaction ==
2056 Mutex mon_report_lock;
2057 utime_t last_mon_report;
2058 utime_t last_pg_stats_sent;
2059
2060 /* if our monitor dies, we want to notice it and reconnect.
2061 * So we keep track of when it last acked our stat updates,
2062 * and if too much time passes (and we've been sending
2063 * more updates) then we can call it dead and reconnect
2064 * elsewhere.
2065 */
2066 utime_t last_pg_stats_ack;
2067 float stats_ack_timeout;
2068 set<uint64_t> outstanding_pg_stats; // how many stat updates haven't been acked yet
2069
2070 // -- boot --
2071 void start_boot();
2072 void _got_mon_epochs(epoch_t oldest, epoch_t newest);
2073 void _preboot(epoch_t oldest, epoch_t newest);
2074 void _send_boot();
2075 void _collect_metadata(map<string,string> *pmeta);
2076
2077 void start_waiting_for_healthy();
2078 bool _is_healthy();
2079
2080 void send_full_update();
2081
2082 friend struct C_OSD_GetVersion;
2083
2084 // -- alive --
2085 epoch_t up_thru_wanted;
2086
2087 void queue_want_up_thru(epoch_t want);
2088 void send_alive();
2089
2090 // -- full map requests --
2091 epoch_t requested_full_first, requested_full_last;
2092
2093 void request_full_map(epoch_t first, epoch_t last);
2094 void rerequest_full_maps() {
2095 epoch_t first = requested_full_first;
2096 epoch_t last = requested_full_last;
2097 requested_full_first = 0;
2098 requested_full_last = 0;
2099 request_full_map(first, last);
2100 }
2101 void got_full_map(epoch_t e);
2102
2103 // -- failures --
2104 map<int,utime_t> failure_queue;
2105 map<int,pair<utime_t,entity_inst_t> > failure_pending;
2106
2107 void requeue_failures();
2108 void send_failures();
2109 void send_still_alive(epoch_t epoch, const entity_inst_t &i);
2110
2111 // -- pg stats --
2112 Mutex pg_stat_queue_lock;
2113 Cond pg_stat_queue_cond;
2114 xlist<PG*> pg_stat_queue;
2115 bool osd_stat_updated;
2116 uint64_t pg_stat_tid, pg_stat_tid_flushed;
2117
2118 void send_pg_stats(const utime_t &now);
2119 void handle_pg_stats_ack(class MPGStatsAck *ack);
2120 void flush_pg_stats();
2121
2122 ceph::coarse_mono_clock::time_point last_sent_beacon;
2123 Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
2124 epoch_t min_last_epoch_clean = 0;
2125 // which pgs were scanned for min_lec
2126 std::vector<pg_t> min_last_epoch_clean_pgs;
2127 void send_beacon(const ceph::coarse_mono_clock::time_point& now);
2128
2129 void pg_stat_queue_enqueue(PG *pg) {
2130 pg_stat_queue_lock.Lock();
2131 if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) {
2132 pg->get("pg_stat_queue");
2133 pg_stat_queue.push_back(&pg->stat_queue_item);
2134 }
2135 osd_stat_updated = true;
2136 pg_stat_queue_lock.Unlock();
2137 }
2138 void pg_stat_queue_dequeue(PG *pg) {
2139 pg_stat_queue_lock.Lock();
2140 if (pg->stat_queue_item.remove_myself())
2141 pg->put("pg_stat_queue");
2142 pg_stat_queue_lock.Unlock();
2143 }
2144 void clear_pg_stat_queue() {
2145 pg_stat_queue_lock.Lock();
2146 while (!pg_stat_queue.empty()) {
2147 PG *pg = pg_stat_queue.front();
2148 pg_stat_queue.pop_front();
2149 pg->put("pg_stat_queue");
2150 }
2151 pg_stat_queue_lock.Unlock();
2152 }
224ce89b
WB
2153 void clear_outstanding_pg_stats(){
2154 Mutex::Locker l(pg_stat_queue_lock);
2155 outstanding_pg_stats.clear();
2156 }
7c673cae
FG
2157
2158 ceph_tid_t get_tid() {
2159 return service.get_tid();
2160 }
2161
2162 // -- generic pg peering --
2163 PG::RecoveryCtx create_context();
2164 void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
2165 ThreadPool::TPHandle *handle = NULL);
2166 void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
2167 ThreadPool::TPHandle *handle = NULL);
2168 void do_notifies(map<int,
2169 vector<pair<pg_notify_t, PastIntervals> > >&
2170 notify_list,
2171 OSDMapRef map);
2172 void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
2173 OSDMapRef map);
2174 void do_infos(map<int,
2175 vector<pair<pg_notify_t, PastIntervals> > >& info_map,
2176 OSDMapRef map);
2177
2178 bool require_mon_peer(const Message *m);
2179 bool require_mon_or_mgr_peer(const Message *m);
2180 bool require_osd_peer(const Message *m);
2181 /***
2182 * Verifies that we were alive in the given epoch, and that
2183 * still are.
2184 */
2185 bool require_self_aliveness(const Message *m, epoch_t alive_since);
2186 /**
2187 * Verifies that the OSD who sent the given op has the same
2188 * address as in the given map.
2189 * @pre op was sent by an OSD using the cluster messenger
2190 */
2191 bool require_same_peer_instance(const Message *m, OSDMapRef& map,
2192 bool is_fast_dispatch);
2193
2194 bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
2195 bool is_fast_dispatch);
2196
2197 void handle_pg_query(OpRequestRef op);
2198 void handle_pg_notify(OpRequestRef op);
2199 void handle_pg_log(OpRequestRef op);
2200 void handle_pg_info(OpRequestRef op);
2201 void handle_pg_trim(OpRequestRef op);
2202
2203 void handle_pg_backfill_reserve(OpRequestRef op);
2204 void handle_pg_recovery_reserve(OpRequestRef op);
2205
c07f9fc5
FG
2206 void handle_force_recovery(Message *m);
2207
7c673cae
FG
2208 void handle_pg_remove(OpRequestRef op);
2209 void _remove_pg(PG *pg);
2210
2211 // -- commands --
2212 struct Command {
2213 vector<string> cmd;
2214 ceph_tid_t tid;
2215 bufferlist indata;
2216 ConnectionRef con;
2217
2218 Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co)
2219 : cmd(c), tid(t), indata(bl), con(co) {}
2220 };
2221 list<Command*> command_queue;
2222 struct CommandWQ : public ThreadPool::WorkQueue<Command> {
2223 OSD *osd;
2224 CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
2225 : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {}
2226
2227 bool _empty() override {
2228 return osd->command_queue.empty();
2229 }
2230 bool _enqueue(Command *c) override {
2231 osd->command_queue.push_back(c);
2232 return true;
2233 }
2234 void _dequeue(Command *pg) override {
2235 ceph_abort();
2236 }
2237 Command *_dequeue() override {
2238 if (osd->command_queue.empty())
2239 return NULL;
2240 Command *c = osd->command_queue.front();
2241 osd->command_queue.pop_front();
2242 return c;
2243 }
2244 void _process(Command *c, ThreadPool::TPHandle &) override {
2245 osd->osd_lock.Lock();
2246 if (osd->is_stopping()) {
2247 osd->osd_lock.Unlock();
2248 delete c;
2249 return;
2250 }
2251 osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
2252 osd->osd_lock.Unlock();
2253 delete c;
2254 }
2255 void _clear() override {
2256 while (!osd->command_queue.empty()) {
2257 Command *c = osd->command_queue.front();
2258 osd->command_queue.pop_front();
2259 delete c;
2260 }
2261 }
2262 } command_wq;
2263
2264 void handle_command(class MMonCommand *m);
2265 void handle_command(class MCommand *m);
2266 void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
2267
2268 // -- pg recovery --
2269 void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
2270 ThreadPool::TPHandle &handle);
2271
2272
2273 // -- scrubbing --
2274 void sched_scrub();
2275 bool scrub_random_backoff();
2276 bool scrub_load_below_threshold();
2277 bool scrub_time_permit(utime_t now);
2278
2279 // -- removing --
2280 struct RemoveWQ :
2281 public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
2282 CephContext* cct;
2283 ObjectStore *&store;
2284 list<pair<PGRef, DeletingStateRef> > remove_queue;
2285 RemoveWQ(CephContext* cct, ObjectStore *&o, time_t ti, time_t si,
2286 ThreadPool *tp)
2287 : ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> >(
2288 "OSD::RemoveWQ", ti, si, tp), cct(cct), store(o) {}
2289
2290 bool _empty() override {
2291 return remove_queue.empty();
2292 }
2293 void _enqueue(pair<PGRef, DeletingStateRef> item) override {
2294 remove_queue.push_back(item);
2295 }
2296 void _enqueue_front(pair<PGRef, DeletingStateRef> item) override {
2297 remove_queue.push_front(item);
2298 }
2299 bool _dequeue(pair<PGRef, DeletingStateRef> item) {
2300 ceph_abort();
2301 }
2302 pair<PGRef, DeletingStateRef> _dequeue() override {
2303 assert(!remove_queue.empty());
2304 pair<PGRef, DeletingStateRef> item = remove_queue.front();
2305 remove_queue.pop_front();
2306 return item;
2307 }
2308 void _process(pair<PGRef, DeletingStateRef>,
2309 ThreadPool::TPHandle &) override;
2310 void _clear() override {
2311 remove_queue.clear();
2312 }
94b18763
FG
2313 int get_remove_queue_len() {
2314 lock();
2315 int r = remove_queue.size();
2316 unlock();
2317 return r;
2318 }
7c673cae
FG
2319 } remove_wq;
2320
b32b8144
FG
2321 // -- status reporting --
2322 MPGStats *collect_pg_stats();
2323 std::vector<OSDHealthMetric> get_health_metrics();
2324
224ce89b 2325private:
7c673cae
FG
2326 bool ms_can_fast_dispatch_any() const override { return true; }
2327 bool ms_can_fast_dispatch(const Message *m) const override {
2328 switch (m->get_type()) {
2329 case CEPH_MSG_OSD_OP:
2330 case CEPH_MSG_OSD_BACKOFF:
2331 case MSG_OSD_SUBOP:
2332 case MSG_OSD_REPOP:
2333 case MSG_OSD_SUBOPREPLY:
2334 case MSG_OSD_REPOPREPLY:
2335 case MSG_OSD_PG_PUSH:
2336 case MSG_OSD_PG_PULL:
2337 case MSG_OSD_PG_PUSH_REPLY:
2338 case MSG_OSD_PG_SCAN:
2339 case MSG_OSD_PG_BACKFILL:
2340 case MSG_OSD_PG_BACKFILL_REMOVE:
2341 case MSG_OSD_EC_WRITE:
2342 case MSG_OSD_EC_WRITE_REPLY:
2343 case MSG_OSD_EC_READ:
2344 case MSG_OSD_EC_READ_REPLY:
2345 case MSG_OSD_SCRUB_RESERVE:
2346 case MSG_OSD_REP_SCRUB:
2347 case MSG_OSD_REP_SCRUBMAP:
2348 case MSG_OSD_PG_UPDATE_LOG_MISSING:
2349 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
c07f9fc5
FG
2350 case MSG_OSD_PG_RECOVERY_DELETE:
2351 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
7c673cae
FG
2352 return true;
2353 default:
2354 return false;
2355 }
2356 }
2357 void ms_fast_dispatch(Message *m) override;
2358 void ms_fast_preprocess(Message *m) override;
2359 bool ms_dispatch(Message *m) override;
2360 bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) override;
2361 bool ms_verify_authorizer(Connection *con, int peer_type,
2362 int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
28e407b8
AA
2363 bool& isvalid, CryptoKey& session_key,
2364 std::unique_ptr<AuthAuthorizerChallenge> *challenge) override;
7c673cae
FG
2365 void ms_handle_connect(Connection *con) override;
2366 void ms_handle_fast_connect(Connection *con) override;
2367 void ms_handle_fast_accept(Connection *con) override;
2368 bool ms_handle_reset(Connection *con) override;
2369 void ms_handle_remote_reset(Connection *con) override {}
2370 bool ms_handle_refused(Connection *con) override;
2371
2372 io_queue get_io_queue() const {
2373 if (cct->_conf->osd_op_queue == "debug_random") {
224ce89b
WB
2374 static io_queue index_lookup[] = { io_queue::prioritized,
2375 io_queue::weightedpriority,
2376 io_queue::mclock_opclass,
2377 io_queue::mclock_client };
7c673cae 2378 srand(time(NULL));
224ce89b
WB
2379 unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
2380 return index_lookup[which];
d2e6a577
FG
2381 } else if (cct->_conf->osd_op_queue == "prioritized") {
2382 return io_queue::prioritized;
224ce89b
WB
2383 } else if (cct->_conf->osd_op_queue == "mclock_opclass") {
2384 return io_queue::mclock_opclass;
2385 } else if (cct->_conf->osd_op_queue == "mclock_client") {
2386 return io_queue::mclock_client;
7c673cae 2387 } else {
d2e6a577
FG
2388 // default / catch-all is 'wpq'
2389 return io_queue::weightedpriority;
7c673cae
FG
2390 }
2391 }
2392
2393 unsigned int get_io_prio_cut() const {
2394 if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
2395 srand(time(NULL));
2396 return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
d2e6a577 2397 } else if (cct->_conf->osd_op_queue_cut_off == "high") {
7c673cae 2398 return CEPH_MSG_PRIO_HIGH;
d2e6a577
FG
2399 } else {
2400 // default / catch-all is 'low'
2401 return CEPH_MSG_PRIO_LOW;
7c673cae
FG
2402 }
2403 }
2404
2405 public:
2406 /* internal and external can point to the same messenger, they will still
2407 * be cleaned up properly*/
2408 OSD(CephContext *cct_,
2409 ObjectStore *store_,
2410 int id,
2411 Messenger *internal,
2412 Messenger *external,
2413 Messenger *hb_front_client,
2414 Messenger *hb_back_client,
2415 Messenger *hb_front_server,
2416 Messenger *hb_back_server,
2417 Messenger *osdc_messenger,
2418 MonClient *mc, const std::string &dev, const std::string &jdev);
2419 ~OSD() override;
2420
2421 // static bits
2422 static int mkfs(CephContext *cct, ObjectStore *store,
2423 const string& dev,
2424 uuid_d fsid, int whoami);
2425 /* remove any non-user xattrs from a map of them */
2426 void filter_xattrs(map<string, bufferptr>& attrs) {
2427 for (map<string, bufferptr>::iterator iter = attrs.begin();
2428 iter != attrs.end();
2429 ) {
2430 if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
2431 attrs.erase(iter++);
2432 else ++iter;
2433 }
2434 }
2435
2436private:
2437 int mon_cmd_maybe_osd_create(string &cmd);
2438 int update_crush_device_class();
2439 int update_crush_location();
2440
3efd9988
FG
2441 static int write_meta(CephContext *cct,
2442 ObjectStore *store,
7c673cae
FG
2443 uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami);
2444
2445 void handle_pg_scrub(struct MOSDScrub *m, PG* pg);
2446 void handle_scrub(struct MOSDScrub *m);
2447 void handle_osd_ping(class MOSDPing *m);
2448
2449 int init_op_flags(OpRequestRef& op);
2450
31f18b77
FG
2451 int get_num_op_shards();
2452 int get_num_op_threads();
2453
c07f9fc5
FG
2454 float get_osd_recovery_sleep();
2455
7c673cae
FG
2456public:
2457 static int peek_meta(ObjectStore *store, string& magic,
2458 uuid_d& cluster_fsid, uuid_d& osd_fsid, int& whoami);
2459
2460
2461 // startup/shutdown
2462 int pre_init();
2463 int init();
2464 void final_init();
2465
2466 int enable_disable_fuse(bool stop);
2467
2468 void suicide(int exitcode);
2469 int shutdown();
2470
2471 void handle_signal(int signum);
2472
2473 /// check if we can throw out op from a disconnected client
2474 static bool op_is_discardable(const MOSDOp *m);
2475
2476public:
2477 OSDService service;
2478 friend class OSDService;
2479};
2480
224ce89b
WB
2481
2482std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
2483
2484
7c673cae
FG
2485//compatibility of the executable
2486extern const CompatSet::Feature ceph_osd_feature_compat[];
2487extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
2488extern const CompatSet::Feature ceph_osd_feature_incompat[];
2489
224ce89b 2490#endif // CEPH_OSD_H