]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/OSD.h
update sources to v12.1.1
[ceph.git] / ceph / src / osd / OSD.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_OSD_H
16#define CEPH_OSD_H
17
18#include "PG.h"
19
20#include "msg/Dispatcher.h"
21
22#include "common/Mutex.h"
23#include "common/RWLock.h"
24#include "common/Timer.h"
25#include "common/WorkQueue.h"
26#include "common/AsyncReserver.h"
27#include "common/ceph_context.h"
28#include "common/zipkin_trace.h"
29
30#include "mgr/MgrClient.h"
31
32#include "os/ObjectStore.h"
33#include "OSDCap.h"
34
35#include "auth/KeyRing.h"
36#include "osd/ClassHandler.h"
37
38#include "include/CompatSet.h"
39
40#include "OpRequest.h"
41#include "Session.h"
42
224ce89b
WB
43#include "osd/PGQueueable.h"
44
7c673cae
FG
45#include <atomic>
46#include <map>
47#include <memory>
48#include "include/memory.h"
49using namespace std;
50
51#include "include/unordered_map.h"
52
53#include "common/shared_cache.hpp"
54#include "common/simple_cache.hpp"
55#include "common/sharedptr_registry.hpp"
56#include "common/WeightedPriorityQueue.h"
57#include "common/PrioritizedQueue.h"
224ce89b
WB
58#include "osd/mClockOpClassQueue.h"
59#include "osd/mClockClientQueue.h"
7c673cae
FG
60#include "messages/MOSDOp.h"
61#include "include/Spinlock.h"
62#include "common/EventTrace.h"
63
64#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
65
66
67enum {
68 l_osd_first = 10000,
69 l_osd_op_wip,
70 l_osd_op,
71 l_osd_op_inb,
72 l_osd_op_outb,
73 l_osd_op_lat,
74 l_osd_op_process_lat,
75 l_osd_op_prepare_lat,
76 l_osd_op_r,
77 l_osd_op_r_outb,
78 l_osd_op_r_lat,
79 l_osd_op_r_lat_outb_hist,
80 l_osd_op_r_process_lat,
81 l_osd_op_r_prepare_lat,
82 l_osd_op_w,
83 l_osd_op_w_inb,
84 l_osd_op_w_lat,
85 l_osd_op_w_lat_inb_hist,
86 l_osd_op_w_process_lat,
87 l_osd_op_w_prepare_lat,
88 l_osd_op_rw,
89 l_osd_op_rw_inb,
90 l_osd_op_rw_outb,
91 l_osd_op_rw_lat,
92 l_osd_op_rw_lat_inb_hist,
93 l_osd_op_rw_lat_outb_hist,
94 l_osd_op_rw_process_lat,
95 l_osd_op_rw_prepare_lat,
96
224ce89b
WB
97 l_osd_op_before_queue_op_lat,
98 l_osd_op_before_dequeue_op_lat,
99
7c673cae
FG
100 l_osd_sop,
101 l_osd_sop_inb,
102 l_osd_sop_lat,
103 l_osd_sop_w,
104 l_osd_sop_w_inb,
105 l_osd_sop_w_lat,
106 l_osd_sop_pull,
107 l_osd_sop_pull_lat,
108 l_osd_sop_push,
109 l_osd_sop_push_inb,
110 l_osd_sop_push_lat,
111
112 l_osd_pull,
113 l_osd_push,
114 l_osd_push_outb,
115
116 l_osd_rop,
117
118 l_osd_loadavg,
119 l_osd_buf,
120 l_osd_history_alloc_bytes,
121 l_osd_history_alloc_num,
122 l_osd_cached_crc,
123 l_osd_cached_crc_adjusted,
124 l_osd_missed_crc,
125
126 l_osd_pg,
127 l_osd_pg_primary,
128 l_osd_pg_replica,
129 l_osd_pg_stray,
130 l_osd_hb_to,
131 l_osd_map,
132 l_osd_mape,
133 l_osd_mape_dup,
134
135 l_osd_waiting_for_map,
136
137 l_osd_map_cache_hit,
138 l_osd_map_cache_miss,
139 l_osd_map_cache_miss_low,
140 l_osd_map_cache_miss_low_avg,
31f18b77
FG
141 l_osd_map_bl_cache_hit,
142 l_osd_map_bl_cache_miss,
7c673cae
FG
143
144 l_osd_stat_bytes,
145 l_osd_stat_bytes_used,
146 l_osd_stat_bytes_avail,
147
148 l_osd_copyfrom,
149
150 l_osd_tier_promote,
151 l_osd_tier_flush,
152 l_osd_tier_flush_fail,
153 l_osd_tier_try_flush,
154 l_osd_tier_try_flush_fail,
155 l_osd_tier_evict,
156 l_osd_tier_whiteout,
157 l_osd_tier_dirty,
158 l_osd_tier_clean,
159 l_osd_tier_delay,
160 l_osd_tier_proxy_read,
161 l_osd_tier_proxy_write,
162
163 l_osd_agent_wake,
164 l_osd_agent_skip,
165 l_osd_agent_flush,
166 l_osd_agent_evict,
167
168 l_osd_object_ctx_cache_hit,
169 l_osd_object_ctx_cache_total,
170
171 l_osd_op_cache_hit,
172 l_osd_tier_flush_lat,
173 l_osd_tier_promote_lat,
174 l_osd_tier_r_lat,
175
176 l_osd_pg_info,
177 l_osd_pg_fastinfo,
178 l_osd_pg_biginfo,
179
180 l_osd_last,
181};
182
183// RecoveryState perf counters
184enum {
185 rs_first = 20000,
186 rs_initial_latency,
187 rs_started_latency,
188 rs_reset_latency,
189 rs_start_latency,
190 rs_primary_latency,
191 rs_peering_latency,
192 rs_backfilling_latency,
193 rs_waitremotebackfillreserved_latency,
194 rs_waitlocalbackfillreserved_latency,
195 rs_notbackfilling_latency,
196 rs_repnotrecovering_latency,
197 rs_repwaitrecoveryreserved_latency,
198 rs_repwaitbackfillreserved_latency,
199 rs_reprecovering_latency,
200 rs_activating_latency,
201 rs_waitlocalrecoveryreserved_latency,
202 rs_waitremoterecoveryreserved_latency,
203 rs_recovering_latency,
204 rs_recovered_latency,
205 rs_clean_latency,
206 rs_active_latency,
207 rs_replicaactive_latency,
208 rs_stray_latency,
209 rs_getinfo_latency,
210 rs_getlog_latency,
211 rs_waitactingchange_latency,
212 rs_incomplete_latency,
213 rs_down_latency,
214 rs_getmissing_latency,
215 rs_waitupthru_latency,
216 rs_notrecovering_latency,
217 rs_last,
218};
219
220class Messenger;
221class Message;
222class MonClient;
223class PerfCounters;
224class ObjectStore;
225class FuseStore;
226class OSDMap;
227class MLog;
228class Objecter;
229
230class Watch;
231class PrimaryLogPG;
232
233class AuthAuthorizeHandlerRegistry;
234
235class TestOpsSocketHook;
236struct C_CompleteSplits;
237struct C_OpenPGs;
238class LogChannel;
239class CephContext;
240typedef ceph::shared_ptr<ObjectStore::Sequencer> SequencerRef;
241class MOSDOp;
242
243class DeletingState {
244 Mutex lock;
245 Cond cond;
246 enum {
247 QUEUED,
248 CLEARING_DIR,
249 CLEARING_WAITING,
250 DELETING_DIR,
251 DELETED_DIR,
252 CANCELED,
253 } status;
254 bool stop_deleting;
255public:
256 const spg_t pgid;
257 const PGRef old_pg_state;
258 explicit DeletingState(const pair<spg_t, PGRef> &in) :
259 lock("DeletingState::lock"), status(QUEUED), stop_deleting(false),
260 pgid(in.first), old_pg_state(in.second) {
261 }
262
263 /// transition status to CLEARING_WAITING
264 bool pause_clearing() {
265 Mutex::Locker l(lock);
266 assert(status == CLEARING_DIR);
267 if (stop_deleting) {
268 status = CANCELED;
269 cond.Signal();
270 return false;
271 }
272 status = CLEARING_WAITING;
273 return true;
274 } ///< @return false if we should cancel deletion
275
276 /// start or resume the clearing - transition the status to CLEARING_DIR
277 bool start_or_resume_clearing() {
278 Mutex::Locker l(lock);
279 assert(
280 status == QUEUED ||
281 status == DELETED_DIR ||
282 status == CLEARING_WAITING);
283 if (stop_deleting) {
284 status = CANCELED;
285 cond.Signal();
286 return false;
287 }
288 status = CLEARING_DIR;
289 return true;
290 } ///< @return false if we should cancel the deletion
291
292 /// transition status to CLEARING_DIR
293 bool resume_clearing() {
294 Mutex::Locker l(lock);
295 assert(status == CLEARING_WAITING);
296 if (stop_deleting) {
297 status = CANCELED;
298 cond.Signal();
299 return false;
300 }
301 status = CLEARING_DIR;
302 return true;
303 } ///< @return false if we should cancel deletion
304
305 /// transition status to deleting
306 bool start_deleting() {
307 Mutex::Locker l(lock);
308 assert(status == CLEARING_DIR);
309 if (stop_deleting) {
310 status = CANCELED;
311 cond.Signal();
312 return false;
313 }
314 status = DELETING_DIR;
315 return true;
316 } ///< @return false if we should cancel deletion
317
318 /// signal collection removal queued
319 void finish_deleting() {
320 Mutex::Locker l(lock);
321 assert(status == DELETING_DIR);
322 status = DELETED_DIR;
323 cond.Signal();
324 }
325
326 /// try to halt the deletion
327 bool try_stop_deletion() {
328 Mutex::Locker l(lock);
329 stop_deleting = true;
330 /**
331 * If we are in DELETING_DIR or CLEARING_DIR, there are in progress
332 * operations we have to wait for before continuing on. States
333 * CLEARING_WAITING and QUEUED indicate that the remover will check
334 * stop_deleting before queueing any further operations. CANCELED
335 * indicates that the remover has already halted. DELETED_DIR
336 * indicates that the deletion has been fully queued.
337 */
338 while (status == DELETING_DIR || status == CLEARING_DIR)
339 cond.Wait(lock);
340 return status != DELETED_DIR;
341 } ///< @return true if we don't need to recreate the collection
342};
343typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
344
345class OSD;
346
7c673cae
FG
347class OSDService {
348public:
349 OSD *osd;
350 CephContext *cct;
351 SharedPtrRegistry<spg_t, ObjectStore::Sequencer> osr_registry;
352 ceph::shared_ptr<ObjectStore::Sequencer> meta_osr;
353 SharedPtrRegistry<spg_t, DeletingState> deleting_pgs;
354 const int whoami;
355 ObjectStore *&store;
356 LogClient &log_client;
357 LogChannelRef clog;
358 PGRecoveryStats &pg_recovery_stats;
359private:
360 Messenger *&cluster_messenger;
361 Messenger *&client_messenger;
362public:
363 PerfCounters *&logger;
364 PerfCounters *&recoverystate_perf;
365 MonClient *&monc;
366 ThreadPool::BatchWorkQueue<PG> &peering_wq;
367 GenContextWQ recovery_gen_wq;
368 ClassHandler *&class_handler;
369
370 void enqueue_back(spg_t pgid, PGQueueable qi);
371 void enqueue_front(spg_t pgid, PGQueueable qi);
372
373 void maybe_inject_dispatch_delay() {
374 if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) {
375 if (rand() % 10000 <
376 g_conf->osd_debug_inject_dispatch_delay_probability * 10000) {
377 utime_t t;
378 t.set_from_double(g_conf->osd_debug_inject_dispatch_delay_duration);
379 t.sleep();
380 }
381 }
382 }
383
384private:
385 // -- map epoch lower bound --
386 Mutex pg_epoch_lock;
387 multiset<epoch_t> pg_epochs;
388 map<spg_t,epoch_t> pg_epoch;
389
390public:
391 void pg_add_epoch(spg_t pgid, epoch_t epoch) {
392 Mutex::Locker l(pg_epoch_lock);
393 map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
394 assert(t == pg_epoch.end());
395 pg_epoch[pgid] = epoch;
396 pg_epochs.insert(epoch);
397 }
398 void pg_update_epoch(spg_t pgid, epoch_t epoch) {
399 Mutex::Locker l(pg_epoch_lock);
400 map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
401 assert(t != pg_epoch.end());
402 pg_epochs.erase(pg_epochs.find(t->second));
403 t->second = epoch;
404 pg_epochs.insert(epoch);
405 }
406 void pg_remove_epoch(spg_t pgid) {
407 Mutex::Locker l(pg_epoch_lock);
408 map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
409 if (t != pg_epoch.end()) {
410 pg_epochs.erase(pg_epochs.find(t->second));
411 pg_epoch.erase(t);
412 }
413 }
414 epoch_t get_min_pg_epoch() {
415 Mutex::Locker l(pg_epoch_lock);
416 if (pg_epochs.empty())
417 return 0;
418 else
419 return *pg_epochs.begin();
420 }
421
422private:
423 // -- superblock --
424 Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
425 OSDSuperblock superblock;
426
427public:
428 OSDSuperblock get_superblock() {
429 Mutex::Locker l(publish_lock);
430 return superblock;
431 }
432 void publish_superblock(const OSDSuperblock &block) {
433 Mutex::Locker l(publish_lock);
434 superblock = block;
435 }
436
437 int get_nodeid() const { return whoami; }
438
439 std::atomic<epoch_t> max_oldest_map;
440private:
441 OSDMapRef osdmap;
442
443public:
444 OSDMapRef get_osdmap() {
445 Mutex::Locker l(publish_lock);
446 return osdmap;
447 }
448 epoch_t get_osdmap_epoch() {
449 Mutex::Locker l(publish_lock);
450 return osdmap ? osdmap->get_epoch() : 0;
451 }
452 void publish_map(OSDMapRef map) {
453 Mutex::Locker l(publish_lock);
454 osdmap = map;
455 }
456
457 /*
458 * osdmap - current published map
459 * next_osdmap - pre_published map that is about to be published.
460 *
461 * We use the next_osdmap to send messages and initiate connections,
462 * but only if the target is the same instance as the one in the map
463 * epoch the current user is working from (i.e., the result is
464 * equivalent to what is in next_osdmap).
465 *
466 * This allows the helpers to start ignoring osds that are about to
467 * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
468 * down, without worrying about reopening connections from threads
469 * working from old maps.
470 */
471private:
472 OSDMapRef next_osdmap;
473 Cond pre_publish_cond;
474
475public:
476 void pre_publish_map(OSDMapRef map) {
477 Mutex::Locker l(pre_publish_lock);
478 next_osdmap = std::move(map);
479 }
480
481 void activate_map();
482 /// map epochs reserved below
483 map<epoch_t, unsigned> map_reservations;
484
485 /// gets ref to next_osdmap and registers the epoch as reserved
486 OSDMapRef get_nextmap_reserved() {
487 Mutex::Locker l(pre_publish_lock);
488 if (!next_osdmap)
489 return OSDMapRef();
490 epoch_t e = next_osdmap->get_epoch();
491 map<epoch_t, unsigned>::iterator i =
492 map_reservations.insert(make_pair(e, 0)).first;
493 i->second++;
494 return next_osdmap;
495 }
496 /// releases reservation on map
497 void release_map(OSDMapRef osdmap) {
498 Mutex::Locker l(pre_publish_lock);
499 map<epoch_t, unsigned>::iterator i =
500 map_reservations.find(osdmap->get_epoch());
501 assert(i != map_reservations.end());
502 assert(i->second > 0);
503 if (--(i->second) == 0) {
504 map_reservations.erase(i);
505 }
506 pre_publish_cond.Signal();
507 }
508 /// blocks until there are no reserved maps prior to next_osdmap
509 void await_reserved_maps() {
510 Mutex::Locker l(pre_publish_lock);
511 assert(next_osdmap);
512 while (true) {
513 map<epoch_t, unsigned>::const_iterator i = map_reservations.cbegin();
514 if (i == map_reservations.cend() || i->first >= next_osdmap->get_epoch()) {
515 break;
516 } else {
517 pre_publish_cond.Wait(pre_publish_lock);
518 }
519 }
520 }
521
522private:
523 Mutex peer_map_epoch_lock;
524 map<int, epoch_t> peer_map_epoch;
525public:
526 epoch_t get_peer_epoch(int p);
527 epoch_t note_peer_epoch(int p, epoch_t e);
528 void forget_peer_epoch(int p, epoch_t e);
529
530 void send_map(class MOSDMap *m, Connection *con);
531 void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
532 MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
533 OSDSuperblock& superblock);
534 bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
535 const OSDMapRef& osdmap, const epoch_t *sent_epoch_p);
536 void share_map(entity_name_t name, Connection *con, epoch_t epoch,
537 OSDMapRef& osdmap, epoch_t *sent_epoch_p);
538 void share_map_peer(int peer, Connection *con,
539 OSDMapRef map = OSDMapRef());
540
541 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
542 pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
543 void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
544 void send_message_osd_cluster(Message *m, Connection *con) {
545 con->send_message(m);
546 }
547 void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
548 con->send_message(m);
549 }
550 void send_message_osd_client(Message *m, Connection *con) {
551 con->send_message(m);
552 }
553 void send_message_osd_client(Message *m, const ConnectionRef& con) {
554 con->send_message(m);
555 }
556 entity_name_t get_cluster_msgr_name() {
557 return cluster_messenger->get_myname();
558 }
559
560private:
561 // -- scrub scheduling --
562 Mutex sched_scrub_lock;
563 int scrubs_pending;
564 int scrubs_active;
565
566public:
567 struct ScrubJob {
568 CephContext* cct;
569 /// pg to be scrubbed
570 spg_t pgid;
571 /// a time scheduled for scrub. but the scrub could be delayed if system
572 /// load is too high or it fails to fall in the scrub hours
573 utime_t sched_time;
574 /// the hard upper bound of scrub time
575 utime_t deadline;
576 ScrubJob() : cct(nullptr) {}
577 explicit ScrubJob(CephContext* cct, const spg_t& pg,
578 const utime_t& timestamp,
579 double pool_scrub_min_interval = 0,
580 double pool_scrub_max_interval = 0, bool must = true);
581 /// order the jobs by sched_time
582 bool operator<(const ScrubJob& rhs) const;
583 };
584 set<ScrubJob> sched_scrub_pg;
585
586 /// @returns the scrub_reg_stamp used for unregister the scrub job
587 utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval,
588 double pool_scrub_max_interval, bool must) {
589 ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
590 must);
591 Mutex::Locker l(sched_scrub_lock);
592 sched_scrub_pg.insert(scrub);
593 return scrub.sched_time;
594 }
595 void unreg_pg_scrub(spg_t pgid, utime_t t) {
596 Mutex::Locker l(sched_scrub_lock);
597 size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
598 assert(removed);
599 }
600 bool first_scrub_stamp(ScrubJob *out) {
601 Mutex::Locker l(sched_scrub_lock);
602 if (sched_scrub_pg.empty())
603 return false;
604 set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
605 *out = *iter;
606 return true;
607 }
608 bool next_scrub_stamp(const ScrubJob& next,
609 ScrubJob *out) {
610 Mutex::Locker l(sched_scrub_lock);
611 if (sched_scrub_pg.empty())
612 return false;
613 set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next);
614 if (iter == sched_scrub_pg.cend())
615 return false;
616 ++iter;
617 if (iter == sched_scrub_pg.cend())
618 return false;
619 *out = *iter;
620 return true;
621 }
622
623 void dumps_scrub(Formatter *f) {
624 assert(f != nullptr);
625 Mutex::Locker l(sched_scrub_lock);
626
627 f->open_array_section("scrubs");
628 for (const auto &i: sched_scrub_pg) {
629 f->open_object_section("scrub");
630 f->dump_stream("pgid") << i.pgid;
631 f->dump_stream("sched_time") << i.sched_time;
632 f->dump_stream("deadline") << i.deadline;
633 f->dump_bool("forced", i.sched_time == i.deadline);
634 f->close_section();
635 }
636 f->close_section();
637 }
638
639 bool can_inc_scrubs_pending();
640 bool inc_scrubs_pending();
641 void inc_scrubs_active(bool reserved);
642 void dec_scrubs_pending();
643 void dec_scrubs_active();
644
645 void reply_op_error(OpRequestRef op, int err);
646 void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
647 void handle_misdirected_op(PG *pg, OpRequestRef op);
648
649
650private:
651 // -- agent shared state --
652 Mutex agent_lock;
653 Cond agent_cond;
654 map<uint64_t, set<PGRef> > agent_queue;
655 set<PGRef>::iterator agent_queue_pos;
656 bool agent_valid_iterator;
657 int agent_ops;
658 int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
659 set<hobject_t> agent_oids;
660 bool agent_active;
661 struct AgentThread : public Thread {
662 OSDService *osd;
663 explicit AgentThread(OSDService *o) : osd(o) {}
664 void *entry() override {
665 osd->agent_entry();
666 return NULL;
667 }
668 } agent_thread;
669 bool agent_stop_flag;
670 Mutex agent_timer_lock;
671 SafeTimer agent_timer;
672
673public:
674 void agent_entry();
675 void agent_stop();
676
677 void _enqueue(PG *pg, uint64_t priority) {
678 if (!agent_queue.empty() &&
679 agent_queue.rbegin()->first < priority)
680 agent_valid_iterator = false; // inserting higher-priority queue
681 set<PGRef>& nq = agent_queue[priority];
682 if (nq.empty())
683 agent_cond.Signal();
684 nq.insert(pg);
685 }
686
687 void _dequeue(PG *pg, uint64_t old_priority) {
688 set<PGRef>& oq = agent_queue[old_priority];
689 set<PGRef>::iterator p = oq.find(pg);
690 assert(p != oq.end());
691 if (p == agent_queue_pos)
692 ++agent_queue_pos;
693 oq.erase(p);
694 if (oq.empty()) {
695 if (agent_queue.rbegin()->first == old_priority)
696 agent_valid_iterator = false;
697 agent_queue.erase(old_priority);
698 }
699 }
700
701 /// enable agent for a pg
702 void agent_enable_pg(PG *pg, uint64_t priority) {
703 Mutex::Locker l(agent_lock);
704 _enqueue(pg, priority);
705 }
706
707 /// adjust priority for an enagled pg
708 void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
709 Mutex::Locker l(agent_lock);
710 assert(new_priority != old_priority);
711 _enqueue(pg, new_priority);
712 _dequeue(pg, old_priority);
713 }
714
715 /// disable agent for a pg
716 void agent_disable_pg(PG *pg, uint64_t old_priority) {
717 Mutex::Locker l(agent_lock);
718 _dequeue(pg, old_priority);
719 }
720
721 /// note start of an async (evict) op
722 void agent_start_evict_op() {
723 Mutex::Locker l(agent_lock);
724 ++agent_ops;
725 }
726
727 /// note finish or cancellation of an async (evict) op
728 void agent_finish_evict_op() {
729 Mutex::Locker l(agent_lock);
730 assert(agent_ops > 0);
731 --agent_ops;
732 agent_cond.Signal();
733 }
734
735 /// note start of an async (flush) op
736 void agent_start_op(const hobject_t& oid) {
737 Mutex::Locker l(agent_lock);
738 ++agent_ops;
739 assert(agent_oids.count(oid) == 0);
740 agent_oids.insert(oid);
741 }
742
743 /// note finish or cancellation of an async (flush) op
744 void agent_finish_op(const hobject_t& oid) {
745 Mutex::Locker l(agent_lock);
746 assert(agent_ops > 0);
747 --agent_ops;
748 assert(agent_oids.count(oid) == 1);
749 agent_oids.erase(oid);
750 agent_cond.Signal();
751 }
752
753 /// check if we are operating on an object
754 bool agent_is_active_oid(const hobject_t& oid) {
755 Mutex::Locker l(agent_lock);
756 return agent_oids.count(oid);
757 }
758
759 /// get count of active agent ops
760 int agent_get_num_ops() {
761 Mutex::Locker l(agent_lock);
762 return agent_ops;
763 }
764
765 void agent_inc_high_count() {
766 Mutex::Locker l(agent_lock);
767 flush_mode_high_count ++;
768 }
769
770 void agent_dec_high_count() {
771 Mutex::Locker l(agent_lock);
772 flush_mode_high_count --;
773 }
774
775private:
776 /// throttle promotion attempts
777 std::atomic_uint promote_probability_millis{1000}; ///< probability thousands. one word.
778 PromoteCounter promote_counter;
779 utime_t last_recalibrate;
780 unsigned long promote_max_objects, promote_max_bytes;
781
782public:
783 bool promote_throttle() {
784 // NOTE: lockless! we rely on the probability being a single word.
785 promote_counter.attempt();
786 if ((unsigned)rand() % 1000 > promote_probability_millis)
787 return true; // yes throttle (no promote)
788 if (promote_max_objects &&
789 promote_counter.objects > promote_max_objects)
790 return true; // yes throttle
791 if (promote_max_bytes &&
792 promote_counter.bytes > promote_max_bytes)
793 return true; // yes throttle
794 return false; // no throttle (promote)
795 }
796 void promote_finish(uint64_t bytes) {
797 promote_counter.finish(bytes);
798 }
799 void promote_throttle_recalibrate();
800
801 // -- Objecter, for tiering reads/writes from/to other OSDs --
802 Objecter *objecter;
803 Finisher objecter_finisher;
804
805 // -- Watch --
806 Mutex watch_lock;
807 SafeTimer watch_timer;
808 uint64_t next_notif_id;
809 uint64_t get_next_id(epoch_t cur_epoch) {
810 Mutex::Locker l(watch_lock);
811 return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
812 }
813
814 // -- Recovery/Backfill Request Scheduling --
815 Mutex recovery_request_lock;
816 SafeTimer recovery_request_timer;
817
31f18b77
FG
818 // For async recovery sleep
819 bool recovery_needs_sleep = true;
820 utime_t recovery_schedule_time = utime_t();
821
822 Mutex recovery_sleep_lock;
823 SafeTimer recovery_sleep_timer;
824
7c673cae
FG
825 // -- tids --
826 // for ops i issue
827 std::atomic_uint last_tid{0};
828 ceph_tid_t get_tid() {
829 return (ceph_tid_t)last_tid++;
830 }
831
832 // -- backfill_reservation --
833 Finisher reserver_finisher;
834 AsyncReserver<spg_t> local_reserver;
835 AsyncReserver<spg_t> remote_reserver;
836
837 // -- pg_temp --
838private:
839 Mutex pg_temp_lock;
840 map<pg_t, vector<int> > pg_temp_wanted;
841 map<pg_t, vector<int> > pg_temp_pending;
842 void _sent_pg_temp();
843public:
844 void queue_want_pg_temp(pg_t pgid, vector<int>& want);
845 void remove_want_pg_temp(pg_t pgid);
846 void requeue_pg_temp();
847 void send_pg_temp();
848
849 void send_pg_created(pg_t pgid);
850
851 void queue_for_peering(PG *pg);
852
853 Mutex snap_sleep_lock;
854 SafeTimer snap_sleep_timer;
855
31f18b77
FG
856 Mutex scrub_sleep_lock;
857 SafeTimer scrub_sleep_timer;
858
7c673cae
FG
859 AsyncReserver<spg_t> snap_reserver;
860 void queue_for_snap_trim(PG *pg);
861
31f18b77
FG
862 void queue_for_scrub(PG *pg, bool with_high_priority) {
863 unsigned scrub_queue_priority = pg->scrubber.priority;
864 if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
865 scrub_queue_priority = cct->_conf->osd_client_op_priority;
866 }
7c673cae
FG
867 enqueue_back(
868 pg->info.pgid,
869 PGQueueable(
870 PGScrub(pg->get_osdmap()->get_epoch()),
871 cct->_conf->osd_scrub_cost,
31f18b77 872 scrub_queue_priority,
7c673cae
FG
873 ceph_clock_now(),
874 entity_inst_t(),
875 pg->get_osdmap()->get_epoch()));
876 }
877
878private:
879 // -- pg recovery and associated throttling --
880 Mutex recovery_lock;
881 list<pair<epoch_t, PGRef> > awaiting_throttle;
882
883 utime_t defer_recovery_until;
884 uint64_t recovery_ops_active;
885 uint64_t recovery_ops_reserved;
886 bool recovery_paused;
887#ifdef DEBUG_RECOVERY_OIDS
888 map<spg_t, set<hobject_t> > recovery_oids;
889#endif
890 bool _recover_now(uint64_t *available_pushes);
891 void _maybe_queue_recovery();
892 void _queue_for_recovery(
893 pair<epoch_t, PGRef> p, uint64_t reserved_pushes) {
894 assert(recovery_lock.is_locked_by_me());
895 enqueue_back(
896 p.second->info.pgid,
897 PGQueueable(
898 PGRecovery(p.first, reserved_pushes),
899 cct->_conf->osd_recovery_cost,
900 cct->_conf->osd_recovery_priority,
901 ceph_clock_now(),
902 entity_inst_t(),
903 p.first));
904 }
905public:
906 void start_recovery_op(PG *pg, const hobject_t& soid);
907 void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
908 bool is_recovery_active();
909 void release_reserved_pushes(uint64_t pushes) {
910 Mutex::Locker l(recovery_lock);
911 assert(recovery_ops_reserved >= pushes);
912 recovery_ops_reserved -= pushes;
913 _maybe_queue_recovery();
914 }
915 void defer_recovery(float defer_for) {
916 defer_recovery_until = ceph_clock_now();
917 defer_recovery_until += defer_for;
918 }
919 void pause_recovery() {
920 Mutex::Locker l(recovery_lock);
921 recovery_paused = true;
922 }
923 bool recovery_is_paused() {
924 Mutex::Locker l(recovery_lock);
925 return recovery_paused;
926 }
927 void unpause_recovery() {
928 Mutex::Locker l(recovery_lock);
929 recovery_paused = false;
930 _maybe_queue_recovery();
931 }
932 void kick_recovery_queue() {
933 Mutex::Locker l(recovery_lock);
934 _maybe_queue_recovery();
935 }
936 void clear_queued_recovery(PG *pg) {
937 Mutex::Locker l(recovery_lock);
938 for (list<pair<epoch_t, PGRef> >::iterator i = awaiting_throttle.begin();
939 i != awaiting_throttle.end();
940 ) {
941 if (i->second.get() == pg) {
942 awaiting_throttle.erase(i);
943 return;
944 } else {
945 ++i;
946 }
947 }
948 }
949 // delayed pg activation
950 void queue_for_recovery(PG *pg, bool front = false) {
951 Mutex::Locker l(recovery_lock);
952 if (front) {
953 awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
954 } else {
955 awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
956 }
957 _maybe_queue_recovery();
958 }
31f18b77
FG
959 void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
960 Mutex::Locker l(recovery_lock);
961 _queue_for_recovery(make_pair(queued, pg), reserved_pushes);
962 }
7c673cae
FG
963
964
965 // osd map cache (past osd maps)
966 Mutex map_cache_lock;
967 SharedLRU<epoch_t, const OSDMap> map_cache;
968 SimpleLRU<epoch_t, bufferlist> map_bl_cache;
969 SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
970
971 OSDMapRef try_get_map(epoch_t e);
972 OSDMapRef get_map(epoch_t e) {
973 OSDMapRef ret(try_get_map(e));
974 assert(ret);
975 return ret;
976 }
977 OSDMapRef add_map(OSDMap *o) {
978 Mutex::Locker l(map_cache_lock);
979 return _add_map(o);
980 }
981 OSDMapRef _add_map(OSDMap *o);
982
983 void add_map_bl(epoch_t e, bufferlist& bl) {
984 Mutex::Locker l(map_cache_lock);
985 return _add_map_bl(e, bl);
986 }
987 void pin_map_bl(epoch_t e, bufferlist &bl);
988 void _add_map_bl(epoch_t e, bufferlist& bl);
989 bool get_map_bl(epoch_t e, bufferlist& bl) {
990 Mutex::Locker l(map_cache_lock);
991 return _get_map_bl(e, bl);
992 }
993 bool _get_map_bl(epoch_t e, bufferlist& bl);
994
995 void add_map_inc_bl(epoch_t e, bufferlist& bl) {
996 Mutex::Locker l(map_cache_lock);
997 return _add_map_inc_bl(e, bl);
998 }
999 void pin_map_inc_bl(epoch_t e, bufferlist &bl);
1000 void _add_map_inc_bl(epoch_t e, bufferlist& bl);
1001 bool get_inc_map_bl(epoch_t e, bufferlist& bl);
1002
1003 void clear_map_bl_cache_pins(epoch_t e);
1004
1005 void need_heartbeat_peer_update();
1006
1007 void pg_stat_queue_enqueue(PG *pg);
1008 void pg_stat_queue_dequeue(PG *pg);
1009
1010 void init();
1011 void final_init();
1012 void start_shutdown();
31f18b77 1013 void shutdown_reserver();
7c673cae
FG
1014 void shutdown();
1015
1016private:
1017 // split
1018 Mutex in_progress_split_lock;
1019 map<spg_t, spg_t> pending_splits; // child -> parent
1020 map<spg_t, set<spg_t> > rev_pending_splits; // parent -> [children]
1021 set<spg_t> in_progress_splits; // child
1022
1023public:
1024 void _start_split(spg_t parent, const set<spg_t> &children);
1025 void start_split(spg_t parent, const set<spg_t> &children) {
1026 Mutex::Locker l(in_progress_split_lock);
1027 return _start_split(parent, children);
1028 }
1029 void mark_split_in_progress(spg_t parent, const set<spg_t> &pgs);
1030 void complete_split(const set<spg_t> &pgs);
1031 void cancel_pending_splits_for_parent(spg_t parent);
1032 void _cancel_pending_splits_for_parent(spg_t parent);
1033 bool splitting(spg_t pgid);
1034 void expand_pg_num(OSDMapRef old_map,
1035 OSDMapRef new_map);
1036 void _maybe_split_pgid(OSDMapRef old_map,
1037 OSDMapRef new_map,
1038 spg_t pgid);
1039 void init_splits_between(spg_t pgid, OSDMapRef frommap, OSDMapRef tomap);
1040
1041 // -- stats --
1042 Mutex stat_lock;
1043 osd_stat_t osd_stat;
31f18b77 1044 uint32_t seq = 0;
7c673cae
FG
1045
1046 void update_osd_stat(vector<int>& hb_peers);
224ce89b
WB
1047 osd_stat_t set_osd_stat(const struct store_statfs_t &stbuf,
1048 vector<int>& hb_peers);
7c673cae
FG
1049 osd_stat_t get_osd_stat() {
1050 Mutex::Locker l(stat_lock);
31f18b77
FG
1051 ++seq;
1052 osd_stat.up_from = up_epoch;
1053 osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
7c673cae
FG
1054 return osd_stat;
1055 }
31f18b77
FG
1056 uint64_t get_osd_stat_seq() {
1057 Mutex::Locker l(stat_lock);
1058 return osd_stat.seq;
1059 }
7c673cae
FG
1060
1061 // -- OSD Full Status --
1062private:
1063 friend TestOpsSocketHook;
1064 mutable Mutex full_status_lock;
1065 enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
1066 const char *get_full_state_name(s_names s) const {
1067 switch (s) {
1068 case NONE: return "none";
1069 case NEARFULL: return "nearfull";
1070 case BACKFILLFULL: return "backfillfull";
1071 case FULL: return "full";
1072 case FAILSAFE: return "failsafe";
1073 default: return "???";
1074 }
1075 }
1076 s_names get_full_state(string type) const {
1077 if (type == "none")
1078 return NONE;
1079 else if (type == "failsafe")
1080 return FAILSAFE;
1081 else if (type == "full")
1082 return FULL;
1083 else if (type == "backfillfull")
1084 return BACKFILLFULL;
1085 else if (type == "nearfull")
1086 return NEARFULL;
1087 else
1088 return INVALID;
1089 }
1090 double cur_ratio; ///< current utilization
1091 mutable int64_t injectfull = 0;
1092 s_names injectfull_state = NONE;
1093 float get_failsafe_full_ratio();
224ce89b 1094 void check_full_status(float ratio);
7c673cae
FG
1095 bool _check_full(s_names type, ostream &ss) const;
1096public:
1097 bool check_failsafe_full(ostream &ss) const;
1098 bool check_full(ostream &ss) const;
1099 bool check_backfill_full(ostream &ss) const;
1100 bool check_nearfull(ostream &ss) const;
1101 bool is_failsafe_full() const;
1102 bool is_full() const;
1103 bool is_backfillfull() const;
1104 bool is_nearfull() const;
1105 bool need_fullness_update(); ///< osdmap state needs update
1106 void set_injectfull(s_names type, int64_t count);
1107 bool check_osdmap_full(const set<pg_shard_t> &missing_on);
1108
1109
1110 // -- epochs --
1111private:
1112 mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch
1113 epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
1114 epoch_t up_epoch; // _most_recent_ epoch we were marked up
1115 epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
1116public:
1117 /**
1118 * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params
1119 * can be NULL if you don't care about them.
1120 */
1121 void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
1122 epoch_t *_bind_epoch) const;
1123 /**
1124 * Set the boot, up, and bind epochs. Any NULL params will not be set.
1125 */
1126 void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
1127 const epoch_t *_bind_epoch);
1128 epoch_t get_boot_epoch() const {
1129 epoch_t ret;
1130 retrieve_epochs(&ret, NULL, NULL);
1131 return ret;
1132 }
1133 epoch_t get_up_epoch() const {
1134 epoch_t ret;
1135 retrieve_epochs(NULL, &ret, NULL);
1136 return ret;
1137 }
1138 epoch_t get_bind_epoch() const {
1139 epoch_t ret;
1140 retrieve_epochs(NULL, NULL, &ret);
1141 return ret;
1142 }
1143
1144 // -- stopping --
1145 Mutex is_stopping_lock;
1146 Cond is_stopping_cond;
1147 enum {
1148 NOT_STOPPING,
1149 PREPARING_TO_STOP,
1150 STOPPING };
1151 std::atomic_int state{NOT_STOPPING};
1152 int get_state() {
1153 return state;
1154 }
1155 void set_state(int s) {
1156 state = s;
1157 }
1158 bool is_stopping() const {
1159 return state == STOPPING;
1160 }
1161 bool is_preparing_to_stop() const {
1162 return state == PREPARING_TO_STOP;
1163 }
1164 bool prepare_to_stop();
1165 void got_stop_ack();
1166
1167
1168#ifdef PG_DEBUG_REFS
1169 Mutex pgid_lock;
1170 map<spg_t, int> pgid_tracker;
1171 map<spg_t, PG*> live_pgs;
31f18b77
FG
1172 void add_pgid(spg_t pgid, PG *pg);
1173 void remove_pgid(spg_t pgid, PG *pg);
1174 void dump_live_pgids();
7c673cae
FG
1175#endif
1176
1177 explicit OSDService(OSD *osd);
1178 ~OSDService();
1179};
1180
1181class OSD : public Dispatcher,
1182 public md_config_obs_t {
1183 /** OSD **/
1184 Mutex osd_lock; // global lock
1185 SafeTimer tick_timer; // safe timer (osd_lock)
1186
1187 // Tick timer for those stuff that do not need osd_lock
1188 Mutex tick_timer_lock;
1189 SafeTimer tick_timer_without_osd_lock;
1190public:
1191 // config observer bits
1192 const char** get_tracked_conf_keys() const override;
1193 void handle_conf_change(const struct md_config_t *conf,
1194 const std::set <std::string> &changed) override;
1195 void update_log_config();
1196 void check_config();
1197
1198protected:
1199
1200 static const double OSD_TICK_INTERVAL; // tick interval for tick_timer and tick_timer_without_osd_lock
1201
1202 AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry;
1203 AuthAuthorizeHandlerRegistry *authorize_handler_service_registry;
1204
1205 Messenger *cluster_messenger;
1206 Messenger *client_messenger;
1207 Messenger *objecter_messenger;
1208 MonClient *monc; // check the "monc helpers" list before accessing directly
1209 MgrClient mgrc;
1210 PerfCounters *logger;
1211 PerfCounters *recoverystate_perf;
1212 ObjectStore *store;
1213#ifdef HAVE_LIBFUSE
1214 FuseStore *fuse_store = nullptr;
1215#endif
1216 LogClient log_client;
1217 LogChannelRef clog;
1218
1219 int whoami;
1220 std::string dev_path, journal_path;
1221
31f18b77
FG
1222 bool store_is_rotational = true;
1223
7c673cae
FG
1224 ZTracer::Endpoint trace_endpoint;
1225 void create_logger();
1226 void create_recoverystate_perf();
1227 void tick();
1228 void tick_without_osd_lock();
1229 void _dispatch(Message *m);
1230 void dispatch_op(OpRequestRef op);
1231
1232 void check_osdmap_features(ObjectStore *store);
1233
1234 // asok
1235 friend class OSDSocketHook;
1236 class OSDSocketHook *asok_hook;
1237 bool asok_command(string admin_command, cmdmap_t& cmdmap, string format, ostream& ss);
1238
1239public:
1240 ClassHandler *class_handler = nullptr;
1241 int get_nodeid() { return whoami; }
1242
1243 static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
1244 char foo[20];
1245 snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
1246 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1247 }
1248 static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
1249 char foo[22];
1250 snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch);
1251 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1252 }
1253
1254 static ghobject_t make_snapmapper_oid() {
1255 return ghobject_t(hobject_t(
1256 sobject_t(
1257 object_t("snapmapper"),
1258 0)));
1259 }
1260
1261 static ghobject_t make_pg_log_oid(spg_t pg) {
1262 stringstream ss;
1263 ss << "pglog_" << pg;
1264 string s;
1265 getline(ss, s);
1266 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1267 }
1268
1269 static ghobject_t make_pg_biginfo_oid(spg_t pg) {
1270 stringstream ss;
1271 ss << "pginfo_" << pg;
1272 string s;
1273 getline(ss, s);
1274 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1275 }
1276 static ghobject_t make_infos_oid() {
1277 hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
1278 return ghobject_t(oid);
1279 }
1280 static void recursive_remove_collection(CephContext* cct,
1281 ObjectStore *store,
1282 spg_t pgid,
1283 coll_t tmp);
1284
1285 /**
1286 * get_osd_initial_compat_set()
1287 *
1288 * Get the initial feature set for this OSD. Features
1289 * here are automatically upgraded.
1290 *
1291 * Return value: Initial osd CompatSet
1292 */
1293 static CompatSet get_osd_initial_compat_set();
1294
1295 /**
1296 * get_osd_compat_set()
1297 *
1298 * Get all features supported by this OSD
1299 *
1300 * Return value: CompatSet of all supported features
1301 */
1302 static CompatSet get_osd_compat_set();
1303
1304
1305private:
1306 class C_Tick;
1307 class C_Tick_WithoutOSDLock;
1308
1309 // -- superblock --
1310 OSDSuperblock superblock;
1311
1312 void write_superblock();
1313 void write_superblock(ObjectStore::Transaction& t);
1314 int read_superblock();
1315
1316 void clear_temp_objects();
1317
1318 CompatSet osd_compat;
1319
1320 // -- state --
1321public:
1322 typedef enum {
1323 STATE_INITIALIZING = 1,
1324 STATE_PREBOOT,
1325 STATE_BOOTING,
1326 STATE_ACTIVE,
1327 STATE_STOPPING,
1328 STATE_WAITING_FOR_HEALTHY
1329 } osd_state_t;
1330
1331 static const char *get_state_name(int s) {
1332 switch (s) {
1333 case STATE_INITIALIZING: return "initializing";
1334 case STATE_PREBOOT: return "preboot";
1335 case STATE_BOOTING: return "booting";
1336 case STATE_ACTIVE: return "active";
1337 case STATE_STOPPING: return "stopping";
1338 case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy";
1339 default: return "???";
1340 }
1341 }
1342
1343private:
1344 std::atomic_int state{STATE_INITIALIZING};
224ce89b 1345 bool waiting_for_luminous_mons = false;
7c673cae
FG
1346
1347public:
1348 int get_state() const {
1349 return state;
1350 }
1351 void set_state(int s) {
1352 state = s;
1353 }
1354 bool is_initializing() const {
1355 return state == STATE_INITIALIZING;
1356 }
1357 bool is_preboot() const {
1358 return state == STATE_PREBOOT;
1359 }
1360 bool is_booting() const {
1361 return state == STATE_BOOTING;
1362 }
1363 bool is_active() const {
1364 return state == STATE_ACTIVE;
1365 }
1366 bool is_stopping() const {
1367 return state == STATE_STOPPING;
1368 }
1369 bool is_waiting_for_healthy() const {
1370 return state == STATE_WAITING_FOR_HEALTHY;
1371 }
1372
1373private:
1374
31f18b77 1375 ThreadPool peering_tp;
7c673cae
FG
1376 ShardedThreadPool osd_op_tp;
1377 ThreadPool disk_tp;
1378 ThreadPool command_tp;
1379
1380 void set_disk_tp_priority();
1381 void get_latest_osdmap();
1382
1383 // -- sessions --
1384private:
1385 void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
1386 void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
1387
1388 Mutex session_waiting_lock;
1389 set<Session*> session_waiting_for_map;
1390
1391 /// Caller assumes refs for included Sessions
1392 void get_sessions_waiting_for_map(set<Session*> *out) {
1393 Mutex::Locker l(session_waiting_lock);
1394 out->swap(session_waiting_for_map);
1395 }
1396 void register_session_waiting_on_map(Session *session) {
1397 Mutex::Locker l(session_waiting_lock);
1398 if (session_waiting_for_map.insert(session).second) {
1399 session->get();
1400 }
1401 }
1402 void clear_session_waiting_on_map(Session *session) {
1403 Mutex::Locker l(session_waiting_lock);
1404 set<Session*>::iterator i = session_waiting_for_map.find(session);
1405 if (i != session_waiting_for_map.end()) {
1406 (*i)->put();
1407 session_waiting_for_map.erase(i);
1408 }
1409 }
1410 void dispatch_sessions_waiting_on_map() {
1411 set<Session*> sessions_to_check;
1412 get_sessions_waiting_for_map(&sessions_to_check);
1413 for (set<Session*>::iterator i = sessions_to_check.begin();
1414 i != sessions_to_check.end();
1415 sessions_to_check.erase(i++)) {
1416 (*i)->session_dispatch_lock.Lock();
1417 dispatch_session_waiting(*i, osdmap);
1418 (*i)->session_dispatch_lock.Unlock();
1419 (*i)->put();
1420 }
1421 }
1422 void session_handle_reset(Session *session) {
1423 Mutex::Locker l(session->session_dispatch_lock);
1424 clear_session_waiting_on_map(session);
1425
1426 session->clear_backoffs();
1427
1428 /* Messages have connection refs, we need to clear the
1429 * connection->session->message->connection
1430 * cycles which result.
1431 * Bug #12338
1432 */
1433 session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
1434 }
1435
1436private:
1437 /**
1438 * @defgroup monc helpers
1439 * @{
1440 * Right now we only have the one
1441 */
1442
1443 /**
1444 * Ask the Monitors for a sequence of OSDMaps.
1445 *
1446 * @param epoch The epoch to start with when replying
1447 * @param force_request True if this request forces a new subscription to
1448 * the monitors; false if an outstanding request that encompasses it is
1449 * sufficient.
1450 */
1451 void osdmap_subscribe(version_t epoch, bool force_request);
1452 /** @} monc helpers */
1453
1454 // -- heartbeat --
1455 /// information about a heartbeat peer
1456 struct HeartbeatInfo {
1457 int peer; ///< peer
1458 ConnectionRef con_front; ///< peer connection (front)
1459 ConnectionRef con_back; ///< peer connection (back)
1460 utime_t first_tx; ///< time we sent our first ping request
1461 utime_t last_tx; ///< last time we sent a ping request
1462 utime_t last_rx_front; ///< last time we got a ping reply on the front side
1463 utime_t last_rx_back; ///< last time we got a ping reply on the back side
1464 epoch_t epoch; ///< most recent epoch we wanted this peer
1465
1466 bool is_unhealthy(utime_t cutoff) const {
1467 return
1468 ! ((last_rx_front > cutoff ||
1469 (last_rx_front == utime_t() && (last_tx == utime_t() ||
1470 first_tx > cutoff))) &&
1471 (last_rx_back > cutoff ||
1472 (last_rx_back == utime_t() && (last_tx == utime_t() ||
1473 first_tx > cutoff))));
1474 }
1475 bool is_healthy(utime_t cutoff) const {
1476 return last_rx_front > cutoff && last_rx_back > cutoff;
1477 }
1478
1479 };
1480 /// state attached to outgoing heartbeat connections
1481 struct HeartbeatSession : public RefCountedObject {
1482 int peer;
1483 explicit HeartbeatSession(int p) : peer(p) {}
1484 };
1485 Mutex heartbeat_lock;
1486 map<int, int> debug_heartbeat_drops_remaining;
1487 Cond heartbeat_cond;
1488 bool heartbeat_stop;
1489 std::atomic_bool heartbeat_need_update;
1490 map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
1491 utime_t last_mon_heartbeat;
1492 Messenger *hb_front_client_messenger;
1493 Messenger *hb_back_client_messenger;
1494 Messenger *hb_front_server_messenger;
1495 Messenger *hb_back_server_messenger;
1496 utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
1497 double daily_loadavg;
1498
1499 void _add_heartbeat_peer(int p);
1500 void _remove_heartbeat_peer(int p);
1501 bool heartbeat_reset(Connection *con);
1502 void maybe_update_heartbeat_peers();
1503 void reset_heartbeat_peers();
1504 bool heartbeat_peers_need_update() {
1505 return heartbeat_need_update.load();
1506 }
1507 void heartbeat_set_peers_need_update() {
1508 heartbeat_need_update.store(true);
1509 }
1510 void heartbeat_clear_peers_need_update() {
1511 heartbeat_need_update.store(false);
1512 }
1513 void heartbeat();
1514 void heartbeat_check();
1515 void heartbeat_entry();
1516 void need_heartbeat_peer_update();
1517
1518 void heartbeat_kick() {
1519 Mutex::Locker l(heartbeat_lock);
1520 heartbeat_cond.Signal();
1521 }
1522
1523 struct T_Heartbeat : public Thread {
1524 OSD *osd;
1525 explicit T_Heartbeat(OSD *o) : osd(o) {}
1526 void *entry() override {
1527 osd->heartbeat_entry();
1528 return 0;
1529 }
1530 } heartbeat_thread;
1531
1532public:
1533 bool heartbeat_dispatch(Message *m);
1534
1535 struct HeartbeatDispatcher : public Dispatcher {
1536 OSD *osd;
1537 explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
1538
1539 bool ms_can_fast_dispatch_any() const override { return true; }
1540 bool ms_can_fast_dispatch(const Message *m) const override {
1541 switch (m->get_type()) {
1542 case CEPH_MSG_PING:
1543 case MSG_OSD_PING:
1544 return true;
1545 default:
1546 return false;
1547 }
1548 }
1549 void ms_fast_dispatch(Message *m) override {
1550 osd->heartbeat_dispatch(m);
1551 }
1552 bool ms_dispatch(Message *m) override {
1553 return osd->heartbeat_dispatch(m);
1554 }
1555 bool ms_handle_reset(Connection *con) override {
1556 return osd->heartbeat_reset(con);
1557 }
1558 void ms_handle_remote_reset(Connection *con) override {}
1559 bool ms_handle_refused(Connection *con) override {
1560 return osd->ms_handle_refused(con);
1561 }
1562 bool ms_verify_authorizer(Connection *con, int peer_type,
1563 int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
1564 bool& isvalid, CryptoKey& session_key) override {
1565 isvalid = true;
1566 return true;
1567 }
1568 } heartbeat_dispatcher;
1569
1570private:
1571 // -- waiters --
1572 list<OpRequestRef> finished;
1573
1574 void take_waiters(list<OpRequestRef>& ls) {
1575 assert(osd_lock.is_locked());
1576 finished.splice(finished.end(), ls);
1577 }
1578 void do_waiters();
1579
1580 // -- op tracking --
1581 OpTracker op_tracker;
1582 void check_ops_in_flight();
1583 void test_ops(std::string command, std::string args, ostream& ss);
1584 friend class TestOpsSocketHook;
1585 TestOpsSocketHook *test_ops_hook;
1586 friend struct C_CompleteSplits;
1587 friend struct C_OpenPGs;
1588
1589 // -- op queue --
224ce89b 1590 enum class io_queue {
7c673cae 1591 prioritized,
224ce89b
WB
1592 weightedpriority,
1593 mclock_opclass,
1594 mclock_client,
7c673cae 1595 };
224ce89b
WB
1596 friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
1597
7c673cae
FG
1598 const io_queue op_queue;
1599 const unsigned int op_prio_cutoff;
1600
1601 /*
1602 * The ordered op delivery chain is:
1603 *
1604 * fast dispatch -> pqueue back
1605 * pqueue front <-> to_process back
1606 * to_process front -> RunVis(item)
1607 * <- queue_front()
1608 *
1609 * The pqueue is per-shard, and to_process is per pg_slot. Items can be
1610 * pushed back up into to_process and/or pqueue while order is preserved.
1611 *
1612 * Multiple worker threads can operate on each shard.
1613 *
1614 * Under normal circumstances, num_running == to_proces.size(). There are
1615 * two times when that is not true: (1) when waiting_for_pg == true and
1616 * to_process is accumulating requests that are waiting for the pg to be
1617 * instantiated; in that case they will all get requeued together by
1618 * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
1619 * and already requeued the items.
1620 */
1621 friend class PGQueueable;
224ce89b 1622
7c673cae
FG
1623 class ShardedOpWQ
1624 : public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>
1625 {
1626 struct ShardData {
1627 Mutex sdata_lock;
1628 Cond sdata_cond;
1629
1630 Mutex sdata_op_ordering_lock; ///< protects all members below
1631
1632 OSDMapRef waiting_for_pg_osdmap;
1633 struct pg_slot {
1634 PGRef pg; ///< cached pg reference [optional]
1635 list<PGQueueable> to_process; ///< order items for this slot
1636 int num_running = 0; ///< _process threads doing pg lookup/lock
1637
1638 /// true if pg does/did not exist. if so all new items go directly to
1639 /// to_process. cleared by prune_pg_waiters.
1640 bool waiting_for_pg = false;
1641
1642 /// incremented by wake_pg_waiters; indicates racing _process threads
1643 /// should bail out (their op has been requeued)
1644 uint64_t requeue_seq = 0;
1645 };
1646
1647 /// map of slots for each spg_t. maintains ordering of items dequeued
1648 /// from pqueue while _process thread drops shard lock to acquire the
1649 /// pg lock. slots are removed only by prune_pg_waiters.
1650 unordered_map<spg_t,pg_slot> pg_slots;
1651
1652 /// priority queue
1653 std::unique_ptr<OpQueue< pair<spg_t, PGQueueable>, entity_inst_t>> pqueue;
1654
1655 void _enqueue_front(pair<spg_t, PGQueueable> item, unsigned cutoff) {
1656 unsigned priority = item.second.get_priority();
1657 unsigned cost = item.second.get_cost();
1658 if (priority >= cutoff)
1659 pqueue->enqueue_strict_front(
1660 item.second.get_owner(),
1661 priority, item);
1662 else
1663 pqueue->enqueue_front(
1664 item.second.get_owner(),
1665 priority, cost, item);
1666 }
1667
1668 ShardData(
1669 string lock_name, string ordering_lock,
1670 uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
1671 io_queue opqueue)
1672 : sdata_lock(lock_name.c_str(), false, true, false, cct),
1673 sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
1674 false, cct) {
224ce89b 1675 if (opqueue == io_queue::weightedpriority) {
7c673cae
FG
1676 pqueue = std::unique_ptr
1677 <WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
1678 new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
1679 max_tok_per_prio, min_cost));
224ce89b 1680 } else if (opqueue == io_queue::prioritized) {
7c673cae
FG
1681 pqueue = std::unique_ptr
1682 <PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
1683 new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
1684 max_tok_per_prio, min_cost));
224ce89b
WB
1685 } else if (opqueue == io_queue::mclock_opclass) {
1686 pqueue = std::unique_ptr
1687 <ceph::mClockOpClassQueue>(new ceph::mClockOpClassQueue(cct));
1688 } else if (opqueue == io_queue::mclock_client) {
1689 pqueue = std::unique_ptr
1690 <ceph::mClockClientQueue>(new ceph::mClockClientQueue(cct));
7c673cae
FG
1691 }
1692 }
224ce89b 1693 }; // struct ShardData
7c673cae
FG
1694
1695 vector<ShardData*> shard_list;
1696 OSD *osd;
1697 uint32_t num_shards;
1698
1699 public:
1700 ShardedOpWQ(uint32_t pnum_shards,
1701 OSD *o,
1702 time_t ti,
1703 time_t si,
1704 ShardedThreadPool* tp)
1705 : ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>(ti, si, tp),
1706 osd(o),
1707 num_shards(pnum_shards) {
1708 for (uint32_t i = 0; i < num_shards; i++) {
1709 char lock_name[32] = {0};
1710 snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
1711 char order_lock[32] = {0};
1712 snprintf(order_lock, sizeof(order_lock), "%s.%d",
1713 "OSD:ShardedOpWQ:order:", i);
1714 ShardData* one_shard = new ShardData(
1715 lock_name, order_lock,
1716 osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
1717 osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue);
1718 shard_list.push_back(one_shard);
1719 }
1720 }
1721 ~ShardedOpWQ() override {
1722 while (!shard_list.empty()) {
1723 delete shard_list.back();
1724 shard_list.pop_back();
1725 }
1726 }
1727
1728 /// wake any pg waiters after a PG is created/instantiated
1729 void wake_pg_waiters(spg_t pgid);
1730
1731 /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here
1732 void prune_pg_waiters(OSDMapRef osdmap, int whoami);
1733
1734 /// clear cached PGRef on pg deletion
1735 void clear_pg_pointer(spg_t pgid);
1736
1737 /// clear pg_slots on shutdown
1738 void clear_pg_slots();
1739
1740 /// try to do some work
1741 void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
1742
1743 /// enqueue a new item
1744 void _enqueue(pair <spg_t, PGQueueable> item) override;
1745
1746 /// requeue an old item (at the front of the line)
1747 void _enqueue_front(pair <spg_t, PGQueueable> item) override;
1748
1749 void return_waiting_threads() override {
1750 for(uint32_t i = 0; i < num_shards; i++) {
1751 ShardData* sdata = shard_list[i];
1752 assert (NULL != sdata);
1753 sdata->sdata_lock.Lock();
1754 sdata->sdata_cond.Signal();
1755 sdata->sdata_lock.Unlock();
1756 }
1757 }
1758
1759 void dump(Formatter *f) {
1760 for(uint32_t i = 0; i < num_shards; i++) {
1761 ShardData* sdata = shard_list[i];
1762 char lock_name[32] = {0};
1763 snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i);
1764 assert (NULL != sdata);
1765 sdata->sdata_op_ordering_lock.Lock();
1766 f->open_object_section(lock_name);
1767 sdata->pqueue->dump(f);
1768 f->close_section();
1769 sdata->sdata_op_ordering_lock.Unlock();
1770 }
1771 }
1772
1773 /// Must be called on ops queued back to front
1774 struct Pred {
1775 spg_t pgid;
1776 list<OpRequestRef> *out_ops;
1777 uint64_t reserved_pushes_to_free;
1778 Pred(spg_t pg, list<OpRequestRef> *out_ops = 0)
1779 : pgid(pg), out_ops(out_ops), reserved_pushes_to_free(0) {}
1780 void accumulate(const PGQueueable &op) {
1781 reserved_pushes_to_free += op.get_reserved_pushes();
1782 if (out_ops) {
1783 boost::optional<OpRequestRef> mop = op.maybe_get_op();
1784 if (mop)
1785 out_ops->push_front(*mop);
1786 }
1787 }
1788 bool operator()(const pair<spg_t, PGQueueable> &op) {
1789 if (op.first == pgid) {
1790 accumulate(op.second);
1791 return true;
1792 } else {
1793 return false;
1794 }
1795 }
1796 uint64_t get_reserved_pushes_to_free() const {
1797 return reserved_pushes_to_free;
1798 }
1799 };
1800
1801 bool is_shard_empty(uint32_t thread_index) override {
1802 uint32_t shard_index = thread_index % num_shards;
1803 ShardData* sdata = shard_list[shard_index];
1804 assert(NULL != sdata);
1805 Mutex::Locker l(sdata->sdata_op_ordering_lock);
1806 return sdata->pqueue->empty();
1807 }
1808 } op_shardedwq;
1809
1810
1811 void enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch);
1812 void dequeue_op(
1813 PGRef pg, OpRequestRef op,
1814 ThreadPool::TPHandle &handle);
1815
1816 // -- peering queue --
1817 struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
1818 list<PG*> peering_queue;
1819 OSD *osd;
1820 set<PG*> in_use;
1821 PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
1822 : ThreadPool::BatchWorkQueue<PG>(
1823 "OSD::PeeringWQ", ti, si, tp), osd(o) {}
1824
1825 void _dequeue(PG *pg) override {
1826 for (list<PG*>::iterator i = peering_queue.begin();
1827 i != peering_queue.end();
1828 ) {
1829 if (*i == pg) {
1830 peering_queue.erase(i++);
1831 pg->put("PeeringWQ");
1832 } else {
1833 ++i;
1834 }
1835 }
1836 }
1837 bool _enqueue(PG *pg) override {
1838 pg->get("PeeringWQ");
1839 peering_queue.push_back(pg);
1840 return true;
1841 }
1842 bool _empty() override {
1843 return peering_queue.empty();
1844 }
1845 void _dequeue(list<PG*> *out) override;
1846 void _process(
1847 const list<PG *> &pgs,
1848 ThreadPool::TPHandle &handle) override {
1849 assert(!pgs.empty());
1850 osd->process_peering_events(pgs, handle);
1851 for (list<PG *>::const_iterator i = pgs.begin();
1852 i != pgs.end();
1853 ++i) {
1854 (*i)->put("PeeringWQ");
1855 }
1856 }
1857 void _process_finish(const list<PG *> &pgs) override {
1858 for (list<PG*>::const_iterator i = pgs.begin();
1859 i != pgs.end();
1860 ++i) {
1861 in_use.erase(*i);
1862 }
1863 }
1864 void _clear() override {
1865 assert(peering_queue.empty());
1866 }
1867 } peering_wq;
1868
1869 void process_peering_events(
1870 const list<PG*> &pg,
1871 ThreadPool::TPHandle &handle);
1872
1873 friend class PG;
1874 friend class PrimaryLogPG;
1875
1876
1877 protected:
1878
1879 // -- osd map --
1880 OSDMapRef osdmap;
1881 OSDMapRef get_osdmap() {
1882 return osdmap;
1883 }
224ce89b 1884 epoch_t get_osdmap_epoch() const {
7c673cae
FG
1885 return osdmap ? osdmap->get_epoch() : 0;
1886 }
1887
1888 utime_t had_map_since;
1889 RWLock map_lock;
1890 list<OpRequestRef> waiting_for_osdmap;
1891 deque<utime_t> osd_markdown_log;
1892
1893 friend struct send_map_on_destruct;
1894
1895 void wait_for_new_map(OpRequestRef op);
1896 void handle_osd_map(class MOSDMap *m);
1897 void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
1898 void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
1899 void note_down_osd(int osd);
1900 void note_up_osd(int osd);
1901 friend class C_OnMapCommit;
1902
1903 bool advance_pg(
1904 epoch_t advance_to, PG *pg,
1905 ThreadPool::TPHandle &handle,
1906 PG::RecoveryCtx *rctx,
31f18b77 1907 set<PGRef> *split_pgs
7c673cae
FG
1908 );
1909 void consume_map();
1910 void activate_map();
1911
1912 // osd map cache (past osd maps)
1913 OSDMapRef get_map(epoch_t e) {
1914 return service.get_map(e);
1915 }
1916 OSDMapRef add_map(OSDMap *o) {
1917 return service.add_map(o);
1918 }
1919 void add_map_bl(epoch_t e, bufferlist& bl) {
1920 return service.add_map_bl(e, bl);
1921 }
1922 void pin_map_bl(epoch_t e, bufferlist &bl) {
1923 return service.pin_map_bl(e, bl);
1924 }
1925 bool get_map_bl(epoch_t e, bufferlist& bl) {
1926 return service.get_map_bl(e, bl);
1927 }
1928 void add_map_inc_bl(epoch_t e, bufferlist& bl) {
1929 return service.add_map_inc_bl(e, bl);
1930 }
1931 void pin_map_inc_bl(epoch_t e, bufferlist &bl) {
1932 return service.pin_map_inc_bl(e, bl);
1933 }
1934
1935protected:
1936 // -- placement groups --
1937 RWLock pg_map_lock; // this lock orders *above* individual PG _locks
1938 ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock
1939
1940 map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
1941 PGRecoveryStats pg_recovery_stats;
1942
1943 PGPool _get_pool(int id, OSDMapRef createmap);
1944
1945 PG *_lookup_lock_pg_with_map_lock_held(spg_t pgid);
1946 PG *_lookup_lock_pg(spg_t pgid);
31f18b77
FG
1947
1948public:
1949 PG *lookup_lock_pg(spg_t pgid);
1950
1951protected:
7c673cae
FG
1952 PG *_open_lock_pg(OSDMapRef createmap,
1953 spg_t pg, bool no_lockdep_check=false);
1954 enum res_result {
1955 RES_PARENT, // resurrected a parent
1956 RES_SELF, // resurrected self
1957 RES_NONE // nothing relevant deleting
1958 };
1959 res_result _try_resurrect_pg(
1960 OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state);
1961
1962 PG *_create_lock_pg(
1963 OSDMapRef createmap,
1964 spg_t pgid,
1965 bool hold_map_lock,
1966 bool backfill,
1967 int role,
1968 vector<int>& up, int up_primary,
1969 vector<int>& acting, int acting_primary,
1970 pg_history_t history,
1971 const PastIntervals& pi,
1972 ObjectStore::Transaction& t);
1973
1974 PG* _make_pg(OSDMapRef createmap, spg_t pgid);
1975 void add_newly_split_pg(PG *pg,
1976 PG::RecoveryCtx *rctx);
1977
1978 int handle_pg_peering_evt(
1979 spg_t pgid,
1980 const pg_history_t& orig_history,
1981 const PastIntervals& pi,
1982 epoch_t epoch,
1983 PG::CephPeeringEvtRef evt);
1984
1985 void load_pgs();
1986 void build_past_intervals_parallel();
1987
1988 /// build initial pg history and intervals on create
1989 void build_initial_pg_history(
1990 spg_t pgid,
1991 epoch_t created,
1992 utime_t created_stamp,
1993 pg_history_t *h,
1994 PastIntervals *pi);
1995
1996 /// project pg history from from to now
1997 bool project_pg_history(
1998 spg_t pgid, pg_history_t& h, epoch_t from,
1999 const vector<int>& lastup,
2000 int lastupprimary,
2001 const vector<int>& lastacting,
2002 int lastactingprimary
2003 ); ///< @return false if there was a map gap between from and now
2004
2005 // this must be called with pg->lock held on any pg addition to pg_map
2006 void wake_pg_waiters(PGRef pg) {
2007 assert(pg->is_locked());
2008 op_shardedwq.wake_pg_waiters(pg->info.pgid);
2009 }
2010 epoch_t last_pg_create_epoch;
2011
2012 void handle_pg_create(OpRequestRef op);
2013
2014 void split_pgs(
2015 PG *parent,
31f18b77 2016 const set<spg_t> &childpgids, set<PGRef> *out_pgs,
7c673cae
FG
2017 OSDMapRef curmap,
2018 OSDMapRef nextmap,
2019 PG::RecoveryCtx *rctx);
2020
2021 // == monitor interaction ==
2022 Mutex mon_report_lock;
2023 utime_t last_mon_report;
2024 utime_t last_pg_stats_sent;
2025
2026 /* if our monitor dies, we want to notice it and reconnect.
2027 * So we keep track of when it last acked our stat updates,
2028 * and if too much time passes (and we've been sending
2029 * more updates) then we can call it dead and reconnect
2030 * elsewhere.
2031 */
2032 utime_t last_pg_stats_ack;
2033 float stats_ack_timeout;
2034 set<uint64_t> outstanding_pg_stats; // how many stat updates haven't been acked yet
2035
2036 // -- boot --
2037 void start_boot();
2038 void _got_mon_epochs(epoch_t oldest, epoch_t newest);
2039 void _preboot(epoch_t oldest, epoch_t newest);
2040 void _send_boot();
2041 void _collect_metadata(map<string,string> *pmeta);
2042
2043 void start_waiting_for_healthy();
2044 bool _is_healthy();
2045
2046 void send_full_update();
2047
2048 friend struct C_OSD_GetVersion;
2049
2050 // -- alive --
2051 epoch_t up_thru_wanted;
2052
2053 void queue_want_up_thru(epoch_t want);
2054 void send_alive();
2055
2056 // -- full map requests --
2057 epoch_t requested_full_first, requested_full_last;
2058
2059 void request_full_map(epoch_t first, epoch_t last);
2060 void rerequest_full_maps() {
2061 epoch_t first = requested_full_first;
2062 epoch_t last = requested_full_last;
2063 requested_full_first = 0;
2064 requested_full_last = 0;
2065 request_full_map(first, last);
2066 }
2067 void got_full_map(epoch_t e);
2068
2069 // -- failures --
2070 map<int,utime_t> failure_queue;
2071 map<int,pair<utime_t,entity_inst_t> > failure_pending;
2072
2073 void requeue_failures();
2074 void send_failures();
2075 void send_still_alive(epoch_t epoch, const entity_inst_t &i);
2076
2077 // -- pg stats --
2078 Mutex pg_stat_queue_lock;
2079 Cond pg_stat_queue_cond;
2080 xlist<PG*> pg_stat_queue;
2081 bool osd_stat_updated;
2082 uint64_t pg_stat_tid, pg_stat_tid_flushed;
2083
2084 void send_pg_stats(const utime_t &now);
2085 void handle_pg_stats_ack(class MPGStatsAck *ack);
2086 void flush_pg_stats();
2087
2088 ceph::coarse_mono_clock::time_point last_sent_beacon;
2089 Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
2090 epoch_t min_last_epoch_clean = 0;
2091 // which pgs were scanned for min_lec
2092 std::vector<pg_t> min_last_epoch_clean_pgs;
2093 void send_beacon(const ceph::coarse_mono_clock::time_point& now);
2094
2095 void pg_stat_queue_enqueue(PG *pg) {
2096 pg_stat_queue_lock.Lock();
2097 if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) {
2098 pg->get("pg_stat_queue");
2099 pg_stat_queue.push_back(&pg->stat_queue_item);
2100 }
2101 osd_stat_updated = true;
2102 pg_stat_queue_lock.Unlock();
2103 }
2104 void pg_stat_queue_dequeue(PG *pg) {
2105 pg_stat_queue_lock.Lock();
2106 if (pg->stat_queue_item.remove_myself())
2107 pg->put("pg_stat_queue");
2108 pg_stat_queue_lock.Unlock();
2109 }
2110 void clear_pg_stat_queue() {
2111 pg_stat_queue_lock.Lock();
2112 while (!pg_stat_queue.empty()) {
2113 PG *pg = pg_stat_queue.front();
2114 pg_stat_queue.pop_front();
2115 pg->put("pg_stat_queue");
2116 }
2117 pg_stat_queue_lock.Unlock();
2118 }
224ce89b
WB
2119 void clear_outstanding_pg_stats(){
2120 Mutex::Locker l(pg_stat_queue_lock);
2121 outstanding_pg_stats.clear();
2122 }
7c673cae
FG
2123
2124 ceph_tid_t get_tid() {
2125 return service.get_tid();
2126 }
2127
2128 // -- generic pg peering --
2129 PG::RecoveryCtx create_context();
2130 void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
2131 ThreadPool::TPHandle *handle = NULL);
2132 void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
2133 ThreadPool::TPHandle *handle = NULL);
2134 void do_notifies(map<int,
2135 vector<pair<pg_notify_t, PastIntervals> > >&
2136 notify_list,
2137 OSDMapRef map);
2138 void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
2139 OSDMapRef map);
2140 void do_infos(map<int,
2141 vector<pair<pg_notify_t, PastIntervals> > >& info_map,
2142 OSDMapRef map);
2143
2144 bool require_mon_peer(const Message *m);
2145 bool require_mon_or_mgr_peer(const Message *m);
2146 bool require_osd_peer(const Message *m);
2147 /***
2148 * Verifies that we were alive in the given epoch, and that
2149 * still are.
2150 */
2151 bool require_self_aliveness(const Message *m, epoch_t alive_since);
2152 /**
2153 * Verifies that the OSD who sent the given op has the same
2154 * address as in the given map.
2155 * @pre op was sent by an OSD using the cluster messenger
2156 */
2157 bool require_same_peer_instance(const Message *m, OSDMapRef& map,
2158 bool is_fast_dispatch);
2159
2160 bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
2161 bool is_fast_dispatch);
2162
2163 void handle_pg_query(OpRequestRef op);
2164 void handle_pg_notify(OpRequestRef op);
2165 void handle_pg_log(OpRequestRef op);
2166 void handle_pg_info(OpRequestRef op);
2167 void handle_pg_trim(OpRequestRef op);
2168
2169 void handle_pg_backfill_reserve(OpRequestRef op);
2170 void handle_pg_recovery_reserve(OpRequestRef op);
2171
2172 void handle_pg_remove(OpRequestRef op);
2173 void _remove_pg(PG *pg);
2174
2175 // -- commands --
2176 struct Command {
2177 vector<string> cmd;
2178 ceph_tid_t tid;
2179 bufferlist indata;
2180 ConnectionRef con;
2181
2182 Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co)
2183 : cmd(c), tid(t), indata(bl), con(co) {}
2184 };
2185 list<Command*> command_queue;
2186 struct CommandWQ : public ThreadPool::WorkQueue<Command> {
2187 OSD *osd;
2188 CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
2189 : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {}
2190
2191 bool _empty() override {
2192 return osd->command_queue.empty();
2193 }
2194 bool _enqueue(Command *c) override {
2195 osd->command_queue.push_back(c);
2196 return true;
2197 }
2198 void _dequeue(Command *pg) override {
2199 ceph_abort();
2200 }
2201 Command *_dequeue() override {
2202 if (osd->command_queue.empty())
2203 return NULL;
2204 Command *c = osd->command_queue.front();
2205 osd->command_queue.pop_front();
2206 return c;
2207 }
2208 void _process(Command *c, ThreadPool::TPHandle &) override {
2209 osd->osd_lock.Lock();
2210 if (osd->is_stopping()) {
2211 osd->osd_lock.Unlock();
2212 delete c;
2213 return;
2214 }
2215 osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
2216 osd->osd_lock.Unlock();
2217 delete c;
2218 }
2219 void _clear() override {
2220 while (!osd->command_queue.empty()) {
2221 Command *c = osd->command_queue.front();
2222 osd->command_queue.pop_front();
2223 delete c;
2224 }
2225 }
2226 } command_wq;
2227
2228 void handle_command(class MMonCommand *m);
2229 void handle_command(class MCommand *m);
2230 void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
2231
2232 // -- pg recovery --
2233 void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
2234 ThreadPool::TPHandle &handle);
2235
2236
2237 // -- scrubbing --
2238 void sched_scrub();
2239 bool scrub_random_backoff();
2240 bool scrub_load_below_threshold();
2241 bool scrub_time_permit(utime_t now);
2242
2243 // -- removing --
2244 struct RemoveWQ :
2245 public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
2246 CephContext* cct;
2247 ObjectStore *&store;
2248 list<pair<PGRef, DeletingStateRef> > remove_queue;
2249 RemoveWQ(CephContext* cct, ObjectStore *&o, time_t ti, time_t si,
2250 ThreadPool *tp)
2251 : ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> >(
2252 "OSD::RemoveWQ", ti, si, tp), cct(cct), store(o) {}
2253
2254 bool _empty() override {
2255 return remove_queue.empty();
2256 }
2257 void _enqueue(pair<PGRef, DeletingStateRef> item) override {
2258 remove_queue.push_back(item);
2259 }
2260 void _enqueue_front(pair<PGRef, DeletingStateRef> item) override {
2261 remove_queue.push_front(item);
2262 }
2263 bool _dequeue(pair<PGRef, DeletingStateRef> item) {
2264 ceph_abort();
2265 }
2266 pair<PGRef, DeletingStateRef> _dequeue() override {
2267 assert(!remove_queue.empty());
2268 pair<PGRef, DeletingStateRef> item = remove_queue.front();
2269 remove_queue.pop_front();
2270 return item;
2271 }
2272 void _process(pair<PGRef, DeletingStateRef>,
2273 ThreadPool::TPHandle &) override;
2274 void _clear() override {
2275 remove_queue.clear();
2276 }
2277 } remove_wq;
2278
224ce89b 2279private:
7c673cae
FG
2280 bool ms_can_fast_dispatch_any() const override { return true; }
2281 bool ms_can_fast_dispatch(const Message *m) const override {
2282 switch (m->get_type()) {
2283 case CEPH_MSG_OSD_OP:
2284 case CEPH_MSG_OSD_BACKOFF:
2285 case MSG_OSD_SUBOP:
2286 case MSG_OSD_REPOP:
2287 case MSG_OSD_SUBOPREPLY:
2288 case MSG_OSD_REPOPREPLY:
2289 case MSG_OSD_PG_PUSH:
2290 case MSG_OSD_PG_PULL:
2291 case MSG_OSD_PG_PUSH_REPLY:
2292 case MSG_OSD_PG_SCAN:
2293 case MSG_OSD_PG_BACKFILL:
2294 case MSG_OSD_PG_BACKFILL_REMOVE:
2295 case MSG_OSD_EC_WRITE:
2296 case MSG_OSD_EC_WRITE_REPLY:
2297 case MSG_OSD_EC_READ:
2298 case MSG_OSD_EC_READ_REPLY:
2299 case MSG_OSD_SCRUB_RESERVE:
2300 case MSG_OSD_REP_SCRUB:
2301 case MSG_OSD_REP_SCRUBMAP:
2302 case MSG_OSD_PG_UPDATE_LOG_MISSING:
2303 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
2304 return true;
2305 default:
2306 return false;
2307 }
2308 }
2309 void ms_fast_dispatch(Message *m) override;
2310 void ms_fast_preprocess(Message *m) override;
2311 bool ms_dispatch(Message *m) override;
2312 bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) override;
2313 bool ms_verify_authorizer(Connection *con, int peer_type,
2314 int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
2315 bool& isvalid, CryptoKey& session_key) override;
2316 void ms_handle_connect(Connection *con) override;
2317 void ms_handle_fast_connect(Connection *con) override;
2318 void ms_handle_fast_accept(Connection *con) override;
2319 bool ms_handle_reset(Connection *con) override;
2320 void ms_handle_remote_reset(Connection *con) override {}
2321 bool ms_handle_refused(Connection *con) override;
2322
2323 io_queue get_io_queue() const {
2324 if (cct->_conf->osd_op_queue == "debug_random") {
224ce89b
WB
2325 static io_queue index_lookup[] = { io_queue::prioritized,
2326 io_queue::weightedpriority,
2327 io_queue::mclock_opclass,
2328 io_queue::mclock_client };
7c673cae 2329 srand(time(NULL));
224ce89b
WB
2330 unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
2331 return index_lookup[which];
7c673cae 2332 } else if (cct->_conf->osd_op_queue == "wpq") {
224ce89b
WB
2333 return io_queue::weightedpriority;
2334 } else if (cct->_conf->osd_op_queue == "mclock_opclass") {
2335 return io_queue::mclock_opclass;
2336 } else if (cct->_conf->osd_op_queue == "mclock_client") {
2337 return io_queue::mclock_client;
7c673cae 2338 } else {
224ce89b 2339 return io_queue::prioritized;
7c673cae
FG
2340 }
2341 }
2342
2343 unsigned int get_io_prio_cut() const {
2344 if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
2345 srand(time(NULL));
2346 return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
2347 } else if (cct->_conf->osd_op_queue_cut_off == "low") {
2348 return CEPH_MSG_PRIO_LOW;
2349 } else {
2350 return CEPH_MSG_PRIO_HIGH;
2351 }
2352 }
2353
2354 public:
2355 /* internal and external can point to the same messenger, they will still
2356 * be cleaned up properly*/
2357 OSD(CephContext *cct_,
2358 ObjectStore *store_,
2359 int id,
2360 Messenger *internal,
2361 Messenger *external,
2362 Messenger *hb_front_client,
2363 Messenger *hb_back_client,
2364 Messenger *hb_front_server,
2365 Messenger *hb_back_server,
2366 Messenger *osdc_messenger,
2367 MonClient *mc, const std::string &dev, const std::string &jdev);
2368 ~OSD() override;
2369
2370 // static bits
2371 static int mkfs(CephContext *cct, ObjectStore *store,
2372 const string& dev,
2373 uuid_d fsid, int whoami);
2374 /* remove any non-user xattrs from a map of them */
2375 void filter_xattrs(map<string, bufferptr>& attrs) {
2376 for (map<string, bufferptr>::iterator iter = attrs.begin();
2377 iter != attrs.end();
2378 ) {
2379 if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
2380 attrs.erase(iter++);
2381 else ++iter;
2382 }
2383 }
2384
2385private:
2386 int mon_cmd_maybe_osd_create(string &cmd);
2387 int update_crush_device_class();
2388 int update_crush_location();
2389
2390 static int write_meta(ObjectStore *store,
2391 uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami);
2392
2393 void handle_pg_scrub(struct MOSDScrub *m, PG* pg);
2394 void handle_scrub(struct MOSDScrub *m);
2395 void handle_osd_ping(class MOSDPing *m);
2396
2397 int init_op_flags(OpRequestRef& op);
2398
31f18b77
FG
2399 int get_num_op_shards();
2400 int get_num_op_threads();
2401
7c673cae
FG
2402public:
2403 static int peek_meta(ObjectStore *store, string& magic,
2404 uuid_d& cluster_fsid, uuid_d& osd_fsid, int& whoami);
2405
2406
2407 // startup/shutdown
2408 int pre_init();
2409 int init();
2410 void final_init();
2411
2412 int enable_disable_fuse(bool stop);
2413
2414 void suicide(int exitcode);
2415 int shutdown();
2416
2417 void handle_signal(int signum);
2418
2419 /// check if we can throw out op from a disconnected client
2420 static bool op_is_discardable(const MOSDOp *m);
2421
2422public:
2423 OSDService service;
2424 friend class OSDService;
2425};
2426
224ce89b
WB
2427
2428std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
2429
2430
7c673cae
FG
2431//compatibility of the executable
2432extern const CompatSet::Feature ceph_osd_feature_compat[];
2433extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
2434extern const CompatSet::Feature ceph_osd_feature_incompat[];
2435
224ce89b 2436#endif // CEPH_OSD_H