]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/OSD.h
bump version to 12.0.3-pve3
[ceph.git] / ceph / src / osd / OSD.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_OSD_H
16#define CEPH_OSD_H
17
18#include "PG.h"
19
20#include "msg/Dispatcher.h"
21
22#include "common/Mutex.h"
23#include "common/RWLock.h"
24#include "common/Timer.h"
25#include "common/WorkQueue.h"
26#include "common/AsyncReserver.h"
27#include "common/ceph_context.h"
28#include "common/zipkin_trace.h"
29
30#include "mgr/MgrClient.h"
31
32#include "os/ObjectStore.h"
33#include "OSDCap.h"
34
35#include "auth/KeyRing.h"
36#include "osd/ClassHandler.h"
37
38#include "include/CompatSet.h"
39
40#include "OpRequest.h"
41#include "Session.h"
42
43#include <atomic>
44#include <map>
45#include <memory>
46#include "include/memory.h"
47using namespace std;
48
49#include "include/unordered_map.h"
50
51#include "common/shared_cache.hpp"
52#include "common/simple_cache.hpp"
53#include "common/sharedptr_registry.hpp"
54#include "common/WeightedPriorityQueue.h"
55#include "common/PrioritizedQueue.h"
56#include "messages/MOSDOp.h"
57#include "include/Spinlock.h"
58#include "common/EventTrace.h"
59
60#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
61
62
63enum {
64 l_osd_first = 10000,
65 l_osd_op_wip,
66 l_osd_op,
67 l_osd_op_inb,
68 l_osd_op_outb,
69 l_osd_op_lat,
70 l_osd_op_process_lat,
71 l_osd_op_prepare_lat,
72 l_osd_op_r,
73 l_osd_op_r_outb,
74 l_osd_op_r_lat,
75 l_osd_op_r_lat_outb_hist,
76 l_osd_op_r_process_lat,
77 l_osd_op_r_prepare_lat,
78 l_osd_op_w,
79 l_osd_op_w_inb,
80 l_osd_op_w_lat,
81 l_osd_op_w_lat_inb_hist,
82 l_osd_op_w_process_lat,
83 l_osd_op_w_prepare_lat,
84 l_osd_op_rw,
85 l_osd_op_rw_inb,
86 l_osd_op_rw_outb,
87 l_osd_op_rw_lat,
88 l_osd_op_rw_lat_inb_hist,
89 l_osd_op_rw_lat_outb_hist,
90 l_osd_op_rw_process_lat,
91 l_osd_op_rw_prepare_lat,
92
93 l_osd_sop,
94 l_osd_sop_inb,
95 l_osd_sop_lat,
96 l_osd_sop_w,
97 l_osd_sop_w_inb,
98 l_osd_sop_w_lat,
99 l_osd_sop_pull,
100 l_osd_sop_pull_lat,
101 l_osd_sop_push,
102 l_osd_sop_push_inb,
103 l_osd_sop_push_lat,
104
105 l_osd_pull,
106 l_osd_push,
107 l_osd_push_outb,
108
109 l_osd_rop,
110
111 l_osd_loadavg,
112 l_osd_buf,
113 l_osd_history_alloc_bytes,
114 l_osd_history_alloc_num,
115 l_osd_cached_crc,
116 l_osd_cached_crc_adjusted,
117 l_osd_missed_crc,
118
119 l_osd_pg,
120 l_osd_pg_primary,
121 l_osd_pg_replica,
122 l_osd_pg_stray,
123 l_osd_hb_to,
124 l_osd_map,
125 l_osd_mape,
126 l_osd_mape_dup,
127
128 l_osd_waiting_for_map,
129
130 l_osd_map_cache_hit,
131 l_osd_map_cache_miss,
132 l_osd_map_cache_miss_low,
133 l_osd_map_cache_miss_low_avg,
134
135 l_osd_stat_bytes,
136 l_osd_stat_bytes_used,
137 l_osd_stat_bytes_avail,
138
139 l_osd_copyfrom,
140
141 l_osd_tier_promote,
142 l_osd_tier_flush,
143 l_osd_tier_flush_fail,
144 l_osd_tier_try_flush,
145 l_osd_tier_try_flush_fail,
146 l_osd_tier_evict,
147 l_osd_tier_whiteout,
148 l_osd_tier_dirty,
149 l_osd_tier_clean,
150 l_osd_tier_delay,
151 l_osd_tier_proxy_read,
152 l_osd_tier_proxy_write,
153
154 l_osd_agent_wake,
155 l_osd_agent_skip,
156 l_osd_agent_flush,
157 l_osd_agent_evict,
158
159 l_osd_object_ctx_cache_hit,
160 l_osd_object_ctx_cache_total,
161
162 l_osd_op_cache_hit,
163 l_osd_tier_flush_lat,
164 l_osd_tier_promote_lat,
165 l_osd_tier_r_lat,
166
167 l_osd_pg_info,
168 l_osd_pg_fastinfo,
169 l_osd_pg_biginfo,
170
171 l_osd_last,
172};
173
174// RecoveryState perf counters
175enum {
176 rs_first = 20000,
177 rs_initial_latency,
178 rs_started_latency,
179 rs_reset_latency,
180 rs_start_latency,
181 rs_primary_latency,
182 rs_peering_latency,
183 rs_backfilling_latency,
184 rs_waitremotebackfillreserved_latency,
185 rs_waitlocalbackfillreserved_latency,
186 rs_notbackfilling_latency,
187 rs_repnotrecovering_latency,
188 rs_repwaitrecoveryreserved_latency,
189 rs_repwaitbackfillreserved_latency,
190 rs_reprecovering_latency,
191 rs_activating_latency,
192 rs_waitlocalrecoveryreserved_latency,
193 rs_waitremoterecoveryreserved_latency,
194 rs_recovering_latency,
195 rs_recovered_latency,
196 rs_clean_latency,
197 rs_active_latency,
198 rs_replicaactive_latency,
199 rs_stray_latency,
200 rs_getinfo_latency,
201 rs_getlog_latency,
202 rs_waitactingchange_latency,
203 rs_incomplete_latency,
204 rs_down_latency,
205 rs_getmissing_latency,
206 rs_waitupthru_latency,
207 rs_notrecovering_latency,
208 rs_last,
209};
210
211class Messenger;
212class Message;
213class MonClient;
214class PerfCounters;
215class ObjectStore;
216class FuseStore;
217class OSDMap;
218class MLog;
219class Objecter;
220
221class Watch;
222class PrimaryLogPG;
223
224class AuthAuthorizeHandlerRegistry;
225
226class TestOpsSocketHook;
227struct C_CompleteSplits;
228struct C_OpenPGs;
229class LogChannel;
230class CephContext;
231typedef ceph::shared_ptr<ObjectStore::Sequencer> SequencerRef;
232class MOSDOp;
233
234class DeletingState {
235 Mutex lock;
236 Cond cond;
237 enum {
238 QUEUED,
239 CLEARING_DIR,
240 CLEARING_WAITING,
241 DELETING_DIR,
242 DELETED_DIR,
243 CANCELED,
244 } status;
245 bool stop_deleting;
246public:
247 const spg_t pgid;
248 const PGRef old_pg_state;
249 explicit DeletingState(const pair<spg_t, PGRef> &in) :
250 lock("DeletingState::lock"), status(QUEUED), stop_deleting(false),
251 pgid(in.first), old_pg_state(in.second) {
252 }
253
254 /// transition status to CLEARING_WAITING
255 bool pause_clearing() {
256 Mutex::Locker l(lock);
257 assert(status == CLEARING_DIR);
258 if (stop_deleting) {
259 status = CANCELED;
260 cond.Signal();
261 return false;
262 }
263 status = CLEARING_WAITING;
264 return true;
265 } ///< @return false if we should cancel deletion
266
267 /// start or resume the clearing - transition the status to CLEARING_DIR
268 bool start_or_resume_clearing() {
269 Mutex::Locker l(lock);
270 assert(
271 status == QUEUED ||
272 status == DELETED_DIR ||
273 status == CLEARING_WAITING);
274 if (stop_deleting) {
275 status = CANCELED;
276 cond.Signal();
277 return false;
278 }
279 status = CLEARING_DIR;
280 return true;
281 } ///< @return false if we should cancel the deletion
282
283 /// transition status to CLEARING_DIR
284 bool resume_clearing() {
285 Mutex::Locker l(lock);
286 assert(status == CLEARING_WAITING);
287 if (stop_deleting) {
288 status = CANCELED;
289 cond.Signal();
290 return false;
291 }
292 status = CLEARING_DIR;
293 return true;
294 } ///< @return false if we should cancel deletion
295
296 /// transition status to deleting
297 bool start_deleting() {
298 Mutex::Locker l(lock);
299 assert(status == CLEARING_DIR);
300 if (stop_deleting) {
301 status = CANCELED;
302 cond.Signal();
303 return false;
304 }
305 status = DELETING_DIR;
306 return true;
307 } ///< @return false if we should cancel deletion
308
309 /// signal collection removal queued
310 void finish_deleting() {
311 Mutex::Locker l(lock);
312 assert(status == DELETING_DIR);
313 status = DELETED_DIR;
314 cond.Signal();
315 }
316
317 /// try to halt the deletion
318 bool try_stop_deletion() {
319 Mutex::Locker l(lock);
320 stop_deleting = true;
321 /**
322 * If we are in DELETING_DIR or CLEARING_DIR, there are in progress
323 * operations we have to wait for before continuing on. States
324 * CLEARING_WAITING and QUEUED indicate that the remover will check
325 * stop_deleting before queueing any further operations. CANCELED
326 * indicates that the remover has already halted. DELETED_DIR
327 * indicates that the deletion has been fully queued.
328 */
329 while (status == DELETING_DIR || status == CLEARING_DIR)
330 cond.Wait(lock);
331 return status != DELETED_DIR;
332 } ///< @return true if we don't need to recreate the collection
333};
334typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
335
336class OSD;
337
338struct PGScrub {
339 epoch_t epoch_queued;
340 explicit PGScrub(epoch_t e) : epoch_queued(e) {}
341 ostream &operator<<(ostream &rhs) {
342 return rhs << "PGScrub";
343 }
344};
345
346struct PGSnapTrim {
347 epoch_t epoch_queued;
348 explicit PGSnapTrim(epoch_t e) : epoch_queued(e) {}
349 ostream &operator<<(ostream &rhs) {
350 return rhs << "PGSnapTrim";
351 }
352};
353
354struct PGRecovery {
355 epoch_t epoch_queued;
356 uint64_t reserved_pushes;
357 PGRecovery(epoch_t e, uint64_t reserved_pushes)
358 : epoch_queued(e), reserved_pushes(reserved_pushes) {}
359 ostream &operator<<(ostream &rhs) {
360 return rhs << "PGRecovery(epoch=" << epoch_queued
361 << ", reserved_pushes: " << reserved_pushes << ")";
362 }
363};
364
365class PGQueueable {
366 typedef boost::variant<
367 OpRequestRef,
368 PGSnapTrim,
369 PGScrub,
370 PGRecovery
371 > QVariant;
372 QVariant qvariant;
373 int cost;
374 unsigned priority;
375 utime_t start_time;
376 entity_inst_t owner;
377 epoch_t map_epoch; ///< an epoch we expect the PG to exist in
378
379 struct RunVis : public boost::static_visitor<> {
380 OSD *osd;
381 PGRef &pg;
382 ThreadPool::TPHandle &handle;
383 RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle)
384 : osd(osd), pg(pg), handle(handle) {}
385 void operator()(const OpRequestRef &op);
386 void operator()(const PGSnapTrim &op);
387 void operator()(const PGScrub &op);
388 void operator()(const PGRecovery &op);
389 };
390
391 struct StringifyVis : public boost::static_visitor<std::string> {
392 std::string operator()(const OpRequestRef &op) {
393 return stringify(op);
394 }
395 std::string operator()(const PGSnapTrim &op) {
396 return "PGSnapTrim";
397 }
398 std::string operator()(const PGScrub &op) {
399 return "PGScrub";
400 }
401 std::string operator()(const PGRecovery &op) {
402 return "PGRecovery";
403 }
404 };
405 friend ostream& operator<<(ostream& out, const PGQueueable& q) {
406 StringifyVis v;
407 return out << "PGQueueable(" << boost::apply_visitor(v, q.qvariant)
408 << " prio " << q.priority << " cost " << q.cost
409 << " e" << q.map_epoch << ")";
410 }
411
412public:
413 // cppcheck-suppress noExplicitConstructor
414 PGQueueable(OpRequestRef op, epoch_t e)
415 : qvariant(op), cost(op->get_req()->get_cost()),
416 priority(op->get_req()->get_priority()),
417 start_time(op->get_req()->get_recv_stamp()),
418 owner(op->get_req()->get_source_inst()),
419 map_epoch(e)
420 {}
421 PGQueueable(
422 const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time,
423 const entity_inst_t &owner, epoch_t e)
424 : qvariant(op), cost(cost), priority(priority), start_time(start_time),
425 owner(owner), map_epoch(e) {}
426 PGQueueable(
427 const PGScrub &op, int cost, unsigned priority, utime_t start_time,
428 const entity_inst_t &owner, epoch_t e)
429 : qvariant(op), cost(cost), priority(priority), start_time(start_time),
430 owner(owner), map_epoch(e) {}
431 PGQueueable(
432 const PGRecovery &op, int cost, unsigned priority, utime_t start_time,
433 const entity_inst_t &owner, epoch_t e)
434 : qvariant(op), cost(cost), priority(priority), start_time(start_time),
435 owner(owner), map_epoch(e) {}
436 const boost::optional<OpRequestRef> maybe_get_op() const {
437 const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
438 return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>();
439 }
440 uint64_t get_reserved_pushes() const {
441 const PGRecovery *op = boost::get<PGRecovery>(&qvariant);
442 return op ? op->reserved_pushes : 0;
443 }
444 void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
445 RunVis v(osd, pg, handle);
446 boost::apply_visitor(v, qvariant);
447 }
448 unsigned get_priority() const { return priority; }
449 int get_cost() const { return cost; }
450 utime_t get_start_time() const { return start_time; }
451 entity_inst_t get_owner() const { return owner; }
452 epoch_t get_map_epoch() const { return map_epoch; }
453};
454
455class OSDService {
456public:
457 OSD *osd;
458 CephContext *cct;
459 SharedPtrRegistry<spg_t, ObjectStore::Sequencer> osr_registry;
460 ceph::shared_ptr<ObjectStore::Sequencer> meta_osr;
461 SharedPtrRegistry<spg_t, DeletingState> deleting_pgs;
462 const int whoami;
463 ObjectStore *&store;
464 LogClient &log_client;
465 LogChannelRef clog;
466 PGRecoveryStats &pg_recovery_stats;
467private:
468 Messenger *&cluster_messenger;
469 Messenger *&client_messenger;
470public:
471 PerfCounters *&logger;
472 PerfCounters *&recoverystate_perf;
473 MonClient *&monc;
474 ThreadPool::BatchWorkQueue<PG> &peering_wq;
475 GenContextWQ recovery_gen_wq;
476 ClassHandler *&class_handler;
477
478 void enqueue_back(spg_t pgid, PGQueueable qi);
479 void enqueue_front(spg_t pgid, PGQueueable qi);
480
481 void maybe_inject_dispatch_delay() {
482 if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) {
483 if (rand() % 10000 <
484 g_conf->osd_debug_inject_dispatch_delay_probability * 10000) {
485 utime_t t;
486 t.set_from_double(g_conf->osd_debug_inject_dispatch_delay_duration);
487 t.sleep();
488 }
489 }
490 }
491
492private:
493 // -- map epoch lower bound --
494 Mutex pg_epoch_lock;
495 multiset<epoch_t> pg_epochs;
496 map<spg_t,epoch_t> pg_epoch;
497
498public:
499 void pg_add_epoch(spg_t pgid, epoch_t epoch) {
500 Mutex::Locker l(pg_epoch_lock);
501 map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
502 assert(t == pg_epoch.end());
503 pg_epoch[pgid] = epoch;
504 pg_epochs.insert(epoch);
505 }
506 void pg_update_epoch(spg_t pgid, epoch_t epoch) {
507 Mutex::Locker l(pg_epoch_lock);
508 map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
509 assert(t != pg_epoch.end());
510 pg_epochs.erase(pg_epochs.find(t->second));
511 t->second = epoch;
512 pg_epochs.insert(epoch);
513 }
514 void pg_remove_epoch(spg_t pgid) {
515 Mutex::Locker l(pg_epoch_lock);
516 map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
517 if (t != pg_epoch.end()) {
518 pg_epochs.erase(pg_epochs.find(t->second));
519 pg_epoch.erase(t);
520 }
521 }
522 epoch_t get_min_pg_epoch() {
523 Mutex::Locker l(pg_epoch_lock);
524 if (pg_epochs.empty())
525 return 0;
526 else
527 return *pg_epochs.begin();
528 }
529
530private:
531 // -- superblock --
532 Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
533 OSDSuperblock superblock;
534
535public:
536 OSDSuperblock get_superblock() {
537 Mutex::Locker l(publish_lock);
538 return superblock;
539 }
540 void publish_superblock(const OSDSuperblock &block) {
541 Mutex::Locker l(publish_lock);
542 superblock = block;
543 }
544
545 int get_nodeid() const { return whoami; }
546
547 std::atomic<epoch_t> max_oldest_map;
548private:
549 OSDMapRef osdmap;
550
551public:
552 OSDMapRef get_osdmap() {
553 Mutex::Locker l(publish_lock);
554 return osdmap;
555 }
556 epoch_t get_osdmap_epoch() {
557 Mutex::Locker l(publish_lock);
558 return osdmap ? osdmap->get_epoch() : 0;
559 }
560 void publish_map(OSDMapRef map) {
561 Mutex::Locker l(publish_lock);
562 osdmap = map;
563 }
564
565 /*
566 * osdmap - current published map
567 * next_osdmap - pre_published map that is about to be published.
568 *
569 * We use the next_osdmap to send messages and initiate connections,
570 * but only if the target is the same instance as the one in the map
571 * epoch the current user is working from (i.e., the result is
572 * equivalent to what is in next_osdmap).
573 *
574 * This allows the helpers to start ignoring osds that are about to
575 * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
576 * down, without worrying about reopening connections from threads
577 * working from old maps.
578 */
579private:
580 OSDMapRef next_osdmap;
581 Cond pre_publish_cond;
582
583public:
584 void pre_publish_map(OSDMapRef map) {
585 Mutex::Locker l(pre_publish_lock);
586 next_osdmap = std::move(map);
587 }
588
589 void activate_map();
590 /// map epochs reserved below
591 map<epoch_t, unsigned> map_reservations;
592
593 /// gets ref to next_osdmap and registers the epoch as reserved
594 OSDMapRef get_nextmap_reserved() {
595 Mutex::Locker l(pre_publish_lock);
596 if (!next_osdmap)
597 return OSDMapRef();
598 epoch_t e = next_osdmap->get_epoch();
599 map<epoch_t, unsigned>::iterator i =
600 map_reservations.insert(make_pair(e, 0)).first;
601 i->second++;
602 return next_osdmap;
603 }
604 /// releases reservation on map
605 void release_map(OSDMapRef osdmap) {
606 Mutex::Locker l(pre_publish_lock);
607 map<epoch_t, unsigned>::iterator i =
608 map_reservations.find(osdmap->get_epoch());
609 assert(i != map_reservations.end());
610 assert(i->second > 0);
611 if (--(i->second) == 0) {
612 map_reservations.erase(i);
613 }
614 pre_publish_cond.Signal();
615 }
616 /// blocks until there are no reserved maps prior to next_osdmap
617 void await_reserved_maps() {
618 Mutex::Locker l(pre_publish_lock);
619 assert(next_osdmap);
620 while (true) {
621 map<epoch_t, unsigned>::const_iterator i = map_reservations.cbegin();
622 if (i == map_reservations.cend() || i->first >= next_osdmap->get_epoch()) {
623 break;
624 } else {
625 pre_publish_cond.Wait(pre_publish_lock);
626 }
627 }
628 }
629
630private:
631 Mutex peer_map_epoch_lock;
632 map<int, epoch_t> peer_map_epoch;
633public:
634 epoch_t get_peer_epoch(int p);
635 epoch_t note_peer_epoch(int p, epoch_t e);
636 void forget_peer_epoch(int p, epoch_t e);
637
638 void send_map(class MOSDMap *m, Connection *con);
639 void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
640 MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
641 OSDSuperblock& superblock);
642 bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
643 const OSDMapRef& osdmap, const epoch_t *sent_epoch_p);
644 void share_map(entity_name_t name, Connection *con, epoch_t epoch,
645 OSDMapRef& osdmap, epoch_t *sent_epoch_p);
646 void share_map_peer(int peer, Connection *con,
647 OSDMapRef map = OSDMapRef());
648
649 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
650 pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
651 void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
652 void send_message_osd_cluster(Message *m, Connection *con) {
653 con->send_message(m);
654 }
655 void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
656 con->send_message(m);
657 }
658 void send_message_osd_client(Message *m, Connection *con) {
659 con->send_message(m);
660 }
661 void send_message_osd_client(Message *m, const ConnectionRef& con) {
662 con->send_message(m);
663 }
664 entity_name_t get_cluster_msgr_name() {
665 return cluster_messenger->get_myname();
666 }
667
668private:
669 // -- scrub scheduling --
670 Mutex sched_scrub_lock;
671 int scrubs_pending;
672 int scrubs_active;
673
674public:
675 struct ScrubJob {
676 CephContext* cct;
677 /// pg to be scrubbed
678 spg_t pgid;
679 /// a time scheduled for scrub. but the scrub could be delayed if system
680 /// load is too high or it fails to fall in the scrub hours
681 utime_t sched_time;
682 /// the hard upper bound of scrub time
683 utime_t deadline;
684 ScrubJob() : cct(nullptr) {}
685 explicit ScrubJob(CephContext* cct, const spg_t& pg,
686 const utime_t& timestamp,
687 double pool_scrub_min_interval = 0,
688 double pool_scrub_max_interval = 0, bool must = true);
689 /// order the jobs by sched_time
690 bool operator<(const ScrubJob& rhs) const;
691 };
692 set<ScrubJob> sched_scrub_pg;
693
694 /// @returns the scrub_reg_stamp used for unregister the scrub job
695 utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval,
696 double pool_scrub_max_interval, bool must) {
697 ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
698 must);
699 Mutex::Locker l(sched_scrub_lock);
700 sched_scrub_pg.insert(scrub);
701 return scrub.sched_time;
702 }
703 void unreg_pg_scrub(spg_t pgid, utime_t t) {
704 Mutex::Locker l(sched_scrub_lock);
705 size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
706 assert(removed);
707 }
708 bool first_scrub_stamp(ScrubJob *out) {
709 Mutex::Locker l(sched_scrub_lock);
710 if (sched_scrub_pg.empty())
711 return false;
712 set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
713 *out = *iter;
714 return true;
715 }
716 bool next_scrub_stamp(const ScrubJob& next,
717 ScrubJob *out) {
718 Mutex::Locker l(sched_scrub_lock);
719 if (sched_scrub_pg.empty())
720 return false;
721 set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next);
722 if (iter == sched_scrub_pg.cend())
723 return false;
724 ++iter;
725 if (iter == sched_scrub_pg.cend())
726 return false;
727 *out = *iter;
728 return true;
729 }
730
731 void dumps_scrub(Formatter *f) {
732 assert(f != nullptr);
733 Mutex::Locker l(sched_scrub_lock);
734
735 f->open_array_section("scrubs");
736 for (const auto &i: sched_scrub_pg) {
737 f->open_object_section("scrub");
738 f->dump_stream("pgid") << i.pgid;
739 f->dump_stream("sched_time") << i.sched_time;
740 f->dump_stream("deadline") << i.deadline;
741 f->dump_bool("forced", i.sched_time == i.deadline);
742 f->close_section();
743 }
744 f->close_section();
745 }
746
747 bool can_inc_scrubs_pending();
748 bool inc_scrubs_pending();
749 void inc_scrubs_active(bool reserved);
750 void dec_scrubs_pending();
751 void dec_scrubs_active();
752
753 void reply_op_error(OpRequestRef op, int err);
754 void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
755 void handle_misdirected_op(PG *pg, OpRequestRef op);
756
757
758private:
759 // -- agent shared state --
760 Mutex agent_lock;
761 Cond agent_cond;
762 map<uint64_t, set<PGRef> > agent_queue;
763 set<PGRef>::iterator agent_queue_pos;
764 bool agent_valid_iterator;
765 int agent_ops;
766 int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
767 set<hobject_t> agent_oids;
768 bool agent_active;
769 struct AgentThread : public Thread {
770 OSDService *osd;
771 explicit AgentThread(OSDService *o) : osd(o) {}
772 void *entry() override {
773 osd->agent_entry();
774 return NULL;
775 }
776 } agent_thread;
777 bool agent_stop_flag;
778 Mutex agent_timer_lock;
779 SafeTimer agent_timer;
780
781public:
782 void agent_entry();
783 void agent_stop();
784
785 void _enqueue(PG *pg, uint64_t priority) {
786 if (!agent_queue.empty() &&
787 agent_queue.rbegin()->first < priority)
788 agent_valid_iterator = false; // inserting higher-priority queue
789 set<PGRef>& nq = agent_queue[priority];
790 if (nq.empty())
791 agent_cond.Signal();
792 nq.insert(pg);
793 }
794
795 void _dequeue(PG *pg, uint64_t old_priority) {
796 set<PGRef>& oq = agent_queue[old_priority];
797 set<PGRef>::iterator p = oq.find(pg);
798 assert(p != oq.end());
799 if (p == agent_queue_pos)
800 ++agent_queue_pos;
801 oq.erase(p);
802 if (oq.empty()) {
803 if (agent_queue.rbegin()->first == old_priority)
804 agent_valid_iterator = false;
805 agent_queue.erase(old_priority);
806 }
807 }
808
809 /// enable agent for a pg
810 void agent_enable_pg(PG *pg, uint64_t priority) {
811 Mutex::Locker l(agent_lock);
812 _enqueue(pg, priority);
813 }
814
815 /// adjust priority for an enagled pg
816 void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
817 Mutex::Locker l(agent_lock);
818 assert(new_priority != old_priority);
819 _enqueue(pg, new_priority);
820 _dequeue(pg, old_priority);
821 }
822
823 /// disable agent for a pg
824 void agent_disable_pg(PG *pg, uint64_t old_priority) {
825 Mutex::Locker l(agent_lock);
826 _dequeue(pg, old_priority);
827 }
828
829 /// note start of an async (evict) op
830 void agent_start_evict_op() {
831 Mutex::Locker l(agent_lock);
832 ++agent_ops;
833 }
834
835 /// note finish or cancellation of an async (evict) op
836 void agent_finish_evict_op() {
837 Mutex::Locker l(agent_lock);
838 assert(agent_ops > 0);
839 --agent_ops;
840 agent_cond.Signal();
841 }
842
843 /// note start of an async (flush) op
844 void agent_start_op(const hobject_t& oid) {
845 Mutex::Locker l(agent_lock);
846 ++agent_ops;
847 assert(agent_oids.count(oid) == 0);
848 agent_oids.insert(oid);
849 }
850
851 /// note finish or cancellation of an async (flush) op
852 void agent_finish_op(const hobject_t& oid) {
853 Mutex::Locker l(agent_lock);
854 assert(agent_ops > 0);
855 --agent_ops;
856 assert(agent_oids.count(oid) == 1);
857 agent_oids.erase(oid);
858 agent_cond.Signal();
859 }
860
861 /// check if we are operating on an object
862 bool agent_is_active_oid(const hobject_t& oid) {
863 Mutex::Locker l(agent_lock);
864 return agent_oids.count(oid);
865 }
866
867 /// get count of active agent ops
868 int agent_get_num_ops() {
869 Mutex::Locker l(agent_lock);
870 return agent_ops;
871 }
872
873 void agent_inc_high_count() {
874 Mutex::Locker l(agent_lock);
875 flush_mode_high_count ++;
876 }
877
878 void agent_dec_high_count() {
879 Mutex::Locker l(agent_lock);
880 flush_mode_high_count --;
881 }
882
883private:
884 /// throttle promotion attempts
885 std::atomic_uint promote_probability_millis{1000}; ///< probability thousands. one word.
886 PromoteCounter promote_counter;
887 utime_t last_recalibrate;
888 unsigned long promote_max_objects, promote_max_bytes;
889
890public:
891 bool promote_throttle() {
892 // NOTE: lockless! we rely on the probability being a single word.
893 promote_counter.attempt();
894 if ((unsigned)rand() % 1000 > promote_probability_millis)
895 return true; // yes throttle (no promote)
896 if (promote_max_objects &&
897 promote_counter.objects > promote_max_objects)
898 return true; // yes throttle
899 if (promote_max_bytes &&
900 promote_counter.bytes > promote_max_bytes)
901 return true; // yes throttle
902 return false; // no throttle (promote)
903 }
904 void promote_finish(uint64_t bytes) {
905 promote_counter.finish(bytes);
906 }
907 void promote_throttle_recalibrate();
908
909 // -- Objecter, for tiering reads/writes from/to other OSDs --
910 Objecter *objecter;
911 Finisher objecter_finisher;
912
913 // -- Watch --
914 Mutex watch_lock;
915 SafeTimer watch_timer;
916 uint64_t next_notif_id;
917 uint64_t get_next_id(epoch_t cur_epoch) {
918 Mutex::Locker l(watch_lock);
919 return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
920 }
921
922 // -- Recovery/Backfill Request Scheduling --
923 Mutex recovery_request_lock;
924 SafeTimer recovery_request_timer;
925
926 // -- tids --
927 // for ops i issue
928 std::atomic_uint last_tid{0};
929 ceph_tid_t get_tid() {
930 return (ceph_tid_t)last_tid++;
931 }
932
933 // -- backfill_reservation --
934 Finisher reserver_finisher;
935 AsyncReserver<spg_t> local_reserver;
936 AsyncReserver<spg_t> remote_reserver;
937
938 // -- pg_temp --
939private:
940 Mutex pg_temp_lock;
941 map<pg_t, vector<int> > pg_temp_wanted;
942 map<pg_t, vector<int> > pg_temp_pending;
943 void _sent_pg_temp();
944public:
945 void queue_want_pg_temp(pg_t pgid, vector<int>& want);
946 void remove_want_pg_temp(pg_t pgid);
947 void requeue_pg_temp();
948 void send_pg_temp();
949
950 void send_pg_created(pg_t pgid);
951
952 void queue_for_peering(PG *pg);
953
954 Mutex snap_sleep_lock;
955 SafeTimer snap_sleep_timer;
956
957 AsyncReserver<spg_t> snap_reserver;
958 void queue_for_snap_trim(PG *pg);
959
960 void queue_for_scrub(PG *pg) {
961 enqueue_back(
962 pg->info.pgid,
963 PGQueueable(
964 PGScrub(pg->get_osdmap()->get_epoch()),
965 cct->_conf->osd_scrub_cost,
966 pg->scrubber.priority,
967 ceph_clock_now(),
968 entity_inst_t(),
969 pg->get_osdmap()->get_epoch()));
970 }
971
972private:
973 // -- pg recovery and associated throttling --
974 Mutex recovery_lock;
975 list<pair<epoch_t, PGRef> > awaiting_throttle;
976
977 utime_t defer_recovery_until;
978 uint64_t recovery_ops_active;
979 uint64_t recovery_ops_reserved;
980 bool recovery_paused;
981#ifdef DEBUG_RECOVERY_OIDS
982 map<spg_t, set<hobject_t> > recovery_oids;
983#endif
984 bool _recover_now(uint64_t *available_pushes);
985 void _maybe_queue_recovery();
986 void _queue_for_recovery(
987 pair<epoch_t, PGRef> p, uint64_t reserved_pushes) {
988 assert(recovery_lock.is_locked_by_me());
989 enqueue_back(
990 p.second->info.pgid,
991 PGQueueable(
992 PGRecovery(p.first, reserved_pushes),
993 cct->_conf->osd_recovery_cost,
994 cct->_conf->osd_recovery_priority,
995 ceph_clock_now(),
996 entity_inst_t(),
997 p.first));
998 }
999public:
1000 void start_recovery_op(PG *pg, const hobject_t& soid);
1001 void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
1002 bool is_recovery_active();
1003 void release_reserved_pushes(uint64_t pushes) {
1004 Mutex::Locker l(recovery_lock);
1005 assert(recovery_ops_reserved >= pushes);
1006 recovery_ops_reserved -= pushes;
1007 _maybe_queue_recovery();
1008 }
1009 void defer_recovery(float defer_for) {
1010 defer_recovery_until = ceph_clock_now();
1011 defer_recovery_until += defer_for;
1012 }
1013 void pause_recovery() {
1014 Mutex::Locker l(recovery_lock);
1015 recovery_paused = true;
1016 }
1017 bool recovery_is_paused() {
1018 Mutex::Locker l(recovery_lock);
1019 return recovery_paused;
1020 }
1021 void unpause_recovery() {
1022 Mutex::Locker l(recovery_lock);
1023 recovery_paused = false;
1024 _maybe_queue_recovery();
1025 }
1026 void kick_recovery_queue() {
1027 Mutex::Locker l(recovery_lock);
1028 _maybe_queue_recovery();
1029 }
1030 void clear_queued_recovery(PG *pg) {
1031 Mutex::Locker l(recovery_lock);
1032 for (list<pair<epoch_t, PGRef> >::iterator i = awaiting_throttle.begin();
1033 i != awaiting_throttle.end();
1034 ) {
1035 if (i->second.get() == pg) {
1036 awaiting_throttle.erase(i);
1037 return;
1038 } else {
1039 ++i;
1040 }
1041 }
1042 }
1043 // delayed pg activation
1044 void queue_for_recovery(PG *pg, bool front = false) {
1045 Mutex::Locker l(recovery_lock);
1046 if (front) {
1047 awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
1048 } else {
1049 awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
1050 }
1051 _maybe_queue_recovery();
1052 }
1053
1054
1055 // osd map cache (past osd maps)
1056 Mutex map_cache_lock;
1057 SharedLRU<epoch_t, const OSDMap> map_cache;
1058 SimpleLRU<epoch_t, bufferlist> map_bl_cache;
1059 SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
1060
1061 OSDMapRef try_get_map(epoch_t e);
1062 OSDMapRef get_map(epoch_t e) {
1063 OSDMapRef ret(try_get_map(e));
1064 assert(ret);
1065 return ret;
1066 }
1067 OSDMapRef add_map(OSDMap *o) {
1068 Mutex::Locker l(map_cache_lock);
1069 return _add_map(o);
1070 }
1071 OSDMapRef _add_map(OSDMap *o);
1072
1073 void add_map_bl(epoch_t e, bufferlist& bl) {
1074 Mutex::Locker l(map_cache_lock);
1075 return _add_map_bl(e, bl);
1076 }
1077 void pin_map_bl(epoch_t e, bufferlist &bl);
1078 void _add_map_bl(epoch_t e, bufferlist& bl);
1079 bool get_map_bl(epoch_t e, bufferlist& bl) {
1080 Mutex::Locker l(map_cache_lock);
1081 return _get_map_bl(e, bl);
1082 }
1083 bool _get_map_bl(epoch_t e, bufferlist& bl);
1084
1085 void add_map_inc_bl(epoch_t e, bufferlist& bl) {
1086 Mutex::Locker l(map_cache_lock);
1087 return _add_map_inc_bl(e, bl);
1088 }
1089 void pin_map_inc_bl(epoch_t e, bufferlist &bl);
1090 void _add_map_inc_bl(epoch_t e, bufferlist& bl);
1091 bool get_inc_map_bl(epoch_t e, bufferlist& bl);
1092
1093 void clear_map_bl_cache_pins(epoch_t e);
1094
1095 void need_heartbeat_peer_update();
1096
1097 void pg_stat_queue_enqueue(PG *pg);
1098 void pg_stat_queue_dequeue(PG *pg);
1099
1100 void init();
1101 void final_init();
1102 void start_shutdown();
1103 void shutdown();
1104
1105private:
1106 // split
1107 Mutex in_progress_split_lock;
1108 map<spg_t, spg_t> pending_splits; // child -> parent
1109 map<spg_t, set<spg_t> > rev_pending_splits; // parent -> [children]
1110 set<spg_t> in_progress_splits; // child
1111
1112public:
1113 void _start_split(spg_t parent, const set<spg_t> &children);
1114 void start_split(spg_t parent, const set<spg_t> &children) {
1115 Mutex::Locker l(in_progress_split_lock);
1116 return _start_split(parent, children);
1117 }
1118 void mark_split_in_progress(spg_t parent, const set<spg_t> &pgs);
1119 void complete_split(const set<spg_t> &pgs);
1120 void cancel_pending_splits_for_parent(spg_t parent);
1121 void _cancel_pending_splits_for_parent(spg_t parent);
1122 bool splitting(spg_t pgid);
1123 void expand_pg_num(OSDMapRef old_map,
1124 OSDMapRef new_map);
1125 void _maybe_split_pgid(OSDMapRef old_map,
1126 OSDMapRef new_map,
1127 spg_t pgid);
1128 void init_splits_between(spg_t pgid, OSDMapRef frommap, OSDMapRef tomap);
1129
1130 // -- stats --
1131 Mutex stat_lock;
1132 osd_stat_t osd_stat;
1133
1134 void update_osd_stat(vector<int>& hb_peers);
1135 osd_stat_t get_osd_stat() {
1136 Mutex::Locker l(stat_lock);
1137 return osd_stat;
1138 }
1139
1140 // -- OSD Full Status --
1141private:
1142 friend TestOpsSocketHook;
1143 mutable Mutex full_status_lock;
1144 enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
1145 const char *get_full_state_name(s_names s) const {
1146 switch (s) {
1147 case NONE: return "none";
1148 case NEARFULL: return "nearfull";
1149 case BACKFILLFULL: return "backfillfull";
1150 case FULL: return "full";
1151 case FAILSAFE: return "failsafe";
1152 default: return "???";
1153 }
1154 }
1155 s_names get_full_state(string type) const {
1156 if (type == "none")
1157 return NONE;
1158 else if (type == "failsafe")
1159 return FAILSAFE;
1160 else if (type == "full")
1161 return FULL;
1162 else if (type == "backfillfull")
1163 return BACKFILLFULL;
1164 else if (type == "nearfull")
1165 return NEARFULL;
1166 else
1167 return INVALID;
1168 }
1169 double cur_ratio; ///< current utilization
1170 mutable int64_t injectfull = 0;
1171 s_names injectfull_state = NONE;
1172 float get_failsafe_full_ratio();
1173 void check_full_status(const osd_stat_t &stat);
1174 bool _check_full(s_names type, ostream &ss) const;
1175public:
1176 bool check_failsafe_full(ostream &ss) const;
1177 bool check_full(ostream &ss) const;
1178 bool check_backfill_full(ostream &ss) const;
1179 bool check_nearfull(ostream &ss) const;
1180 bool is_failsafe_full() const;
1181 bool is_full() const;
1182 bool is_backfillfull() const;
1183 bool is_nearfull() const;
1184 bool need_fullness_update(); ///< osdmap state needs update
1185 void set_injectfull(s_names type, int64_t count);
1186 bool check_osdmap_full(const set<pg_shard_t> &missing_on);
1187
1188
1189 // -- epochs --
1190private:
1191 mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch
1192 epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
1193 epoch_t up_epoch; // _most_recent_ epoch we were marked up
1194 epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
1195public:
1196 /**
1197 * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params
1198 * can be NULL if you don't care about them.
1199 */
1200 void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
1201 epoch_t *_bind_epoch) const;
1202 /**
1203 * Set the boot, up, and bind epochs. Any NULL params will not be set.
1204 */
1205 void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
1206 const epoch_t *_bind_epoch);
1207 epoch_t get_boot_epoch() const {
1208 epoch_t ret;
1209 retrieve_epochs(&ret, NULL, NULL);
1210 return ret;
1211 }
1212 epoch_t get_up_epoch() const {
1213 epoch_t ret;
1214 retrieve_epochs(NULL, &ret, NULL);
1215 return ret;
1216 }
1217 epoch_t get_bind_epoch() const {
1218 epoch_t ret;
1219 retrieve_epochs(NULL, NULL, &ret);
1220 return ret;
1221 }
1222
1223 // -- stopping --
1224 Mutex is_stopping_lock;
1225 Cond is_stopping_cond;
1226 enum {
1227 NOT_STOPPING,
1228 PREPARING_TO_STOP,
1229 STOPPING };
1230 std::atomic_int state{NOT_STOPPING};
1231 int get_state() {
1232 return state;
1233 }
1234 void set_state(int s) {
1235 state = s;
1236 }
1237 bool is_stopping() const {
1238 return state == STOPPING;
1239 }
1240 bool is_preparing_to_stop() const {
1241 return state == PREPARING_TO_STOP;
1242 }
1243 bool prepare_to_stop();
1244 void got_stop_ack();
1245
1246
1247#ifdef PG_DEBUG_REFS
1248 Mutex pgid_lock;
1249 map<spg_t, int> pgid_tracker;
1250 map<spg_t, PG*> live_pgs;
1251 void add_pgid(spg_t pgid, PG *pg) {
1252 Mutex::Locker l(pgid_lock);
1253 if (!pgid_tracker.count(pgid)) {
1254 live_pgs[pgid] = pg;
1255 }
1256 pgid_tracker[pgid]++;
1257 }
1258 void remove_pgid(spg_t pgid, PG *pg) {
1259 Mutex::Locker l(pgid_lock);
1260 assert(pgid_tracker.count(pgid));
1261 assert(pgid_tracker[pgid] > 0);
1262 pgid_tracker[pgid]--;
1263 if (pgid_tracker[pgid] == 0) {
1264 pgid_tracker.erase(pgid);
1265 live_pgs.erase(pgid);
1266 }
1267 }
1268 void dump_live_pgids() {
1269 Mutex::Locker l(pgid_lock);
1270 derr << "live pgids:" << dendl;
1271 for (map<spg_t, int>::const_iterator i = pgid_tracker.cbegin();
1272 i != pgid_tracker.cend();
1273 ++i) {
1274 derr << "\t" << *i << dendl;
1275 live_pgs[i->first]->dump_live_ids();
1276 }
1277 }
1278#endif
1279
1280 explicit OSDService(OSD *osd);
1281 ~OSDService();
1282};
1283
1284class OSD : public Dispatcher,
1285 public md_config_obs_t {
1286 /** OSD **/
1287 Mutex osd_lock; // global lock
1288 SafeTimer tick_timer; // safe timer (osd_lock)
1289
1290 // Tick timer for those stuff that do not need osd_lock
1291 Mutex tick_timer_lock;
1292 SafeTimer tick_timer_without_osd_lock;
1293public:
1294 // config observer bits
1295 const char** get_tracked_conf_keys() const override;
1296 void handle_conf_change(const struct md_config_t *conf,
1297 const std::set <std::string> &changed) override;
1298 void update_log_config();
1299 void check_config();
1300
1301protected:
1302
1303 static const double OSD_TICK_INTERVAL; // tick interval for tick_timer and tick_timer_without_osd_lock
1304
1305 AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry;
1306 AuthAuthorizeHandlerRegistry *authorize_handler_service_registry;
1307
1308 Messenger *cluster_messenger;
1309 Messenger *client_messenger;
1310 Messenger *objecter_messenger;
1311 MonClient *monc; // check the "monc helpers" list before accessing directly
1312 MgrClient mgrc;
1313 PerfCounters *logger;
1314 PerfCounters *recoverystate_perf;
1315 ObjectStore *store;
1316#ifdef HAVE_LIBFUSE
1317 FuseStore *fuse_store = nullptr;
1318#endif
1319 LogClient log_client;
1320 LogChannelRef clog;
1321
1322 int whoami;
1323 std::string dev_path, journal_path;
1324
1325 ZTracer::Endpoint trace_endpoint;
1326 void create_logger();
1327 void create_recoverystate_perf();
1328 void tick();
1329 void tick_without_osd_lock();
1330 void _dispatch(Message *m);
1331 void dispatch_op(OpRequestRef op);
1332
1333 void check_osdmap_features(ObjectStore *store);
1334
1335 // asok
1336 friend class OSDSocketHook;
1337 class OSDSocketHook *asok_hook;
1338 bool asok_command(string admin_command, cmdmap_t& cmdmap, string format, ostream& ss);
1339
1340public:
1341 ClassHandler *class_handler = nullptr;
1342 int get_nodeid() { return whoami; }
1343
1344 static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
1345 char foo[20];
1346 snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
1347 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1348 }
1349 static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
1350 char foo[22];
1351 snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch);
1352 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1353 }
1354
1355 static ghobject_t make_snapmapper_oid() {
1356 return ghobject_t(hobject_t(
1357 sobject_t(
1358 object_t("snapmapper"),
1359 0)));
1360 }
1361
1362 static ghobject_t make_pg_log_oid(spg_t pg) {
1363 stringstream ss;
1364 ss << "pglog_" << pg;
1365 string s;
1366 getline(ss, s);
1367 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1368 }
1369
1370 static ghobject_t make_pg_biginfo_oid(spg_t pg) {
1371 stringstream ss;
1372 ss << "pginfo_" << pg;
1373 string s;
1374 getline(ss, s);
1375 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1376 }
1377 static ghobject_t make_infos_oid() {
1378 hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
1379 return ghobject_t(oid);
1380 }
1381 static void recursive_remove_collection(CephContext* cct,
1382 ObjectStore *store,
1383 spg_t pgid,
1384 coll_t tmp);
1385
1386 /**
1387 * get_osd_initial_compat_set()
1388 *
1389 * Get the initial feature set for this OSD. Features
1390 * here are automatically upgraded.
1391 *
1392 * Return value: Initial osd CompatSet
1393 */
1394 static CompatSet get_osd_initial_compat_set();
1395
1396 /**
1397 * get_osd_compat_set()
1398 *
1399 * Get all features supported by this OSD
1400 *
1401 * Return value: CompatSet of all supported features
1402 */
1403 static CompatSet get_osd_compat_set();
1404
1405
1406private:
1407 class C_Tick;
1408 class C_Tick_WithoutOSDLock;
1409
1410 // -- superblock --
1411 OSDSuperblock superblock;
1412
1413 void write_superblock();
1414 void write_superblock(ObjectStore::Transaction& t);
1415 int read_superblock();
1416
1417 void clear_temp_objects();
1418
1419 CompatSet osd_compat;
1420
1421 // -- state --
1422public:
1423 typedef enum {
1424 STATE_INITIALIZING = 1,
1425 STATE_PREBOOT,
1426 STATE_BOOTING,
1427 STATE_ACTIVE,
1428 STATE_STOPPING,
1429 STATE_WAITING_FOR_HEALTHY
1430 } osd_state_t;
1431
1432 static const char *get_state_name(int s) {
1433 switch (s) {
1434 case STATE_INITIALIZING: return "initializing";
1435 case STATE_PREBOOT: return "preboot";
1436 case STATE_BOOTING: return "booting";
1437 case STATE_ACTIVE: return "active";
1438 case STATE_STOPPING: return "stopping";
1439 case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy";
1440 default: return "???";
1441 }
1442 }
1443
1444private:
1445 std::atomic_int state{STATE_INITIALIZING};
1446
1447public:
1448 int get_state() const {
1449 return state;
1450 }
1451 void set_state(int s) {
1452 state = s;
1453 }
1454 bool is_initializing() const {
1455 return state == STATE_INITIALIZING;
1456 }
1457 bool is_preboot() const {
1458 return state == STATE_PREBOOT;
1459 }
1460 bool is_booting() const {
1461 return state == STATE_BOOTING;
1462 }
1463 bool is_active() const {
1464 return state == STATE_ACTIVE;
1465 }
1466 bool is_stopping() const {
1467 return state == STATE_STOPPING;
1468 }
1469 bool is_waiting_for_healthy() const {
1470 return state == STATE_WAITING_FOR_HEALTHY;
1471 }
1472
1473private:
1474
1475 ThreadPool osd_tp;
1476 ShardedThreadPool osd_op_tp;
1477 ThreadPool disk_tp;
1478 ThreadPool command_tp;
1479
1480 void set_disk_tp_priority();
1481 void get_latest_osdmap();
1482
1483 // -- sessions --
1484private:
1485 void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
1486 void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
1487
1488 Mutex session_waiting_lock;
1489 set<Session*> session_waiting_for_map;
1490
1491 /// Caller assumes refs for included Sessions
1492 void get_sessions_waiting_for_map(set<Session*> *out) {
1493 Mutex::Locker l(session_waiting_lock);
1494 out->swap(session_waiting_for_map);
1495 }
1496 void register_session_waiting_on_map(Session *session) {
1497 Mutex::Locker l(session_waiting_lock);
1498 if (session_waiting_for_map.insert(session).second) {
1499 session->get();
1500 }
1501 }
1502 void clear_session_waiting_on_map(Session *session) {
1503 Mutex::Locker l(session_waiting_lock);
1504 set<Session*>::iterator i = session_waiting_for_map.find(session);
1505 if (i != session_waiting_for_map.end()) {
1506 (*i)->put();
1507 session_waiting_for_map.erase(i);
1508 }
1509 }
1510 void dispatch_sessions_waiting_on_map() {
1511 set<Session*> sessions_to_check;
1512 get_sessions_waiting_for_map(&sessions_to_check);
1513 for (set<Session*>::iterator i = sessions_to_check.begin();
1514 i != sessions_to_check.end();
1515 sessions_to_check.erase(i++)) {
1516 (*i)->session_dispatch_lock.Lock();
1517 dispatch_session_waiting(*i, osdmap);
1518 (*i)->session_dispatch_lock.Unlock();
1519 (*i)->put();
1520 }
1521 }
1522 void session_handle_reset(Session *session) {
1523 Mutex::Locker l(session->session_dispatch_lock);
1524 clear_session_waiting_on_map(session);
1525
1526 session->clear_backoffs();
1527
1528 /* Messages have connection refs, we need to clear the
1529 * connection->session->message->connection
1530 * cycles which result.
1531 * Bug #12338
1532 */
1533 session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
1534 }
1535
1536private:
1537 /**
1538 * @defgroup monc helpers
1539 * @{
1540 * Right now we only have the one
1541 */
1542
1543 /**
1544 * Ask the Monitors for a sequence of OSDMaps.
1545 *
1546 * @param epoch The epoch to start with when replying
1547 * @param force_request True if this request forces a new subscription to
1548 * the monitors; false if an outstanding request that encompasses it is
1549 * sufficient.
1550 */
1551 void osdmap_subscribe(version_t epoch, bool force_request);
1552 /** @} monc helpers */
1553
1554 // -- heartbeat --
1555 /// information about a heartbeat peer
1556 struct HeartbeatInfo {
1557 int peer; ///< peer
1558 ConnectionRef con_front; ///< peer connection (front)
1559 ConnectionRef con_back; ///< peer connection (back)
1560 utime_t first_tx; ///< time we sent our first ping request
1561 utime_t last_tx; ///< last time we sent a ping request
1562 utime_t last_rx_front; ///< last time we got a ping reply on the front side
1563 utime_t last_rx_back; ///< last time we got a ping reply on the back side
1564 epoch_t epoch; ///< most recent epoch we wanted this peer
1565
1566 bool is_unhealthy(utime_t cutoff) const {
1567 return
1568 ! ((last_rx_front > cutoff ||
1569 (last_rx_front == utime_t() && (last_tx == utime_t() ||
1570 first_tx > cutoff))) &&
1571 (last_rx_back > cutoff ||
1572 (last_rx_back == utime_t() && (last_tx == utime_t() ||
1573 first_tx > cutoff))));
1574 }
1575 bool is_healthy(utime_t cutoff) const {
1576 return last_rx_front > cutoff && last_rx_back > cutoff;
1577 }
1578
1579 };
1580 /// state attached to outgoing heartbeat connections
1581 struct HeartbeatSession : public RefCountedObject {
1582 int peer;
1583 explicit HeartbeatSession(int p) : peer(p) {}
1584 };
1585 Mutex heartbeat_lock;
1586 map<int, int> debug_heartbeat_drops_remaining;
1587 Cond heartbeat_cond;
1588 bool heartbeat_stop;
1589 std::atomic_bool heartbeat_need_update;
1590 map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
1591 utime_t last_mon_heartbeat;
1592 Messenger *hb_front_client_messenger;
1593 Messenger *hb_back_client_messenger;
1594 Messenger *hb_front_server_messenger;
1595 Messenger *hb_back_server_messenger;
1596 utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
1597 double daily_loadavg;
1598
1599 void _add_heartbeat_peer(int p);
1600 void _remove_heartbeat_peer(int p);
1601 bool heartbeat_reset(Connection *con);
1602 void maybe_update_heartbeat_peers();
1603 void reset_heartbeat_peers();
1604 bool heartbeat_peers_need_update() {
1605 return heartbeat_need_update.load();
1606 }
1607 void heartbeat_set_peers_need_update() {
1608 heartbeat_need_update.store(true);
1609 }
1610 void heartbeat_clear_peers_need_update() {
1611 heartbeat_need_update.store(false);
1612 }
1613 void heartbeat();
1614 void heartbeat_check();
1615 void heartbeat_entry();
1616 void need_heartbeat_peer_update();
1617
1618 void heartbeat_kick() {
1619 Mutex::Locker l(heartbeat_lock);
1620 heartbeat_cond.Signal();
1621 }
1622
1623 struct T_Heartbeat : public Thread {
1624 OSD *osd;
1625 explicit T_Heartbeat(OSD *o) : osd(o) {}
1626 void *entry() override {
1627 osd->heartbeat_entry();
1628 return 0;
1629 }
1630 } heartbeat_thread;
1631
1632public:
1633 bool heartbeat_dispatch(Message *m);
1634
1635 struct HeartbeatDispatcher : public Dispatcher {
1636 OSD *osd;
1637 explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
1638
1639 bool ms_can_fast_dispatch_any() const override { return true; }
1640 bool ms_can_fast_dispatch(const Message *m) const override {
1641 switch (m->get_type()) {
1642 case CEPH_MSG_PING:
1643 case MSG_OSD_PING:
1644 return true;
1645 default:
1646 return false;
1647 }
1648 }
1649 void ms_fast_dispatch(Message *m) override {
1650 osd->heartbeat_dispatch(m);
1651 }
1652 bool ms_dispatch(Message *m) override {
1653 return osd->heartbeat_dispatch(m);
1654 }
1655 bool ms_handle_reset(Connection *con) override {
1656 return osd->heartbeat_reset(con);
1657 }
1658 void ms_handle_remote_reset(Connection *con) override {}
1659 bool ms_handle_refused(Connection *con) override {
1660 return osd->ms_handle_refused(con);
1661 }
1662 bool ms_verify_authorizer(Connection *con, int peer_type,
1663 int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
1664 bool& isvalid, CryptoKey& session_key) override {
1665 isvalid = true;
1666 return true;
1667 }
1668 } heartbeat_dispatcher;
1669
1670private:
1671 // -- waiters --
1672 list<OpRequestRef> finished;
1673
1674 void take_waiters(list<OpRequestRef>& ls) {
1675 assert(osd_lock.is_locked());
1676 finished.splice(finished.end(), ls);
1677 }
1678 void do_waiters();
1679
1680 // -- op tracking --
1681 OpTracker op_tracker;
1682 void check_ops_in_flight();
1683 void test_ops(std::string command, std::string args, ostream& ss);
1684 friend class TestOpsSocketHook;
1685 TestOpsSocketHook *test_ops_hook;
1686 friend struct C_CompleteSplits;
1687 friend struct C_OpenPGs;
1688
1689 // -- op queue --
1690 enum io_queue {
1691 prioritized,
1692 weightedpriority
1693 };
1694 const io_queue op_queue;
1695 const unsigned int op_prio_cutoff;
1696
1697 /*
1698 * The ordered op delivery chain is:
1699 *
1700 * fast dispatch -> pqueue back
1701 * pqueue front <-> to_process back
1702 * to_process front -> RunVis(item)
1703 * <- queue_front()
1704 *
1705 * The pqueue is per-shard, and to_process is per pg_slot. Items can be
1706 * pushed back up into to_process and/or pqueue while order is preserved.
1707 *
1708 * Multiple worker threads can operate on each shard.
1709 *
1710 * Under normal circumstances, num_running == to_proces.size(). There are
1711 * two times when that is not true: (1) when waiting_for_pg == true and
1712 * to_process is accumulating requests that are waiting for the pg to be
1713 * instantiated; in that case they will all get requeued together by
1714 * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
1715 * and already requeued the items.
1716 */
1717 friend class PGQueueable;
1718 class ShardedOpWQ
1719 : public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>
1720 {
1721 struct ShardData {
1722 Mutex sdata_lock;
1723 Cond sdata_cond;
1724
1725 Mutex sdata_op_ordering_lock; ///< protects all members below
1726
1727 OSDMapRef waiting_for_pg_osdmap;
1728 struct pg_slot {
1729 PGRef pg; ///< cached pg reference [optional]
1730 list<PGQueueable> to_process; ///< order items for this slot
1731 int num_running = 0; ///< _process threads doing pg lookup/lock
1732
1733 /// true if pg does/did not exist. if so all new items go directly to
1734 /// to_process. cleared by prune_pg_waiters.
1735 bool waiting_for_pg = false;
1736
1737 /// incremented by wake_pg_waiters; indicates racing _process threads
1738 /// should bail out (their op has been requeued)
1739 uint64_t requeue_seq = 0;
1740 };
1741
1742 /// map of slots for each spg_t. maintains ordering of items dequeued
1743 /// from pqueue while _process thread drops shard lock to acquire the
1744 /// pg lock. slots are removed only by prune_pg_waiters.
1745 unordered_map<spg_t,pg_slot> pg_slots;
1746
1747 /// priority queue
1748 std::unique_ptr<OpQueue< pair<spg_t, PGQueueable>, entity_inst_t>> pqueue;
1749
1750 void _enqueue_front(pair<spg_t, PGQueueable> item, unsigned cutoff) {
1751 unsigned priority = item.second.get_priority();
1752 unsigned cost = item.second.get_cost();
1753 if (priority >= cutoff)
1754 pqueue->enqueue_strict_front(
1755 item.second.get_owner(),
1756 priority, item);
1757 else
1758 pqueue->enqueue_front(
1759 item.second.get_owner(),
1760 priority, cost, item);
1761 }
1762
1763 ShardData(
1764 string lock_name, string ordering_lock,
1765 uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
1766 io_queue opqueue)
1767 : sdata_lock(lock_name.c_str(), false, true, false, cct),
1768 sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
1769 false, cct) {
1770 if (opqueue == weightedpriority) {
1771 pqueue = std::unique_ptr
1772 <WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
1773 new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
1774 max_tok_per_prio, min_cost));
1775 } else if (opqueue == prioritized) {
1776 pqueue = std::unique_ptr
1777 <PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
1778 new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
1779 max_tok_per_prio, min_cost));
1780 }
1781 }
1782 };
1783
1784 vector<ShardData*> shard_list;
1785 OSD *osd;
1786 uint32_t num_shards;
1787
1788 public:
1789 ShardedOpWQ(uint32_t pnum_shards,
1790 OSD *o,
1791 time_t ti,
1792 time_t si,
1793 ShardedThreadPool* tp)
1794 : ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>(ti, si, tp),
1795 osd(o),
1796 num_shards(pnum_shards) {
1797 for (uint32_t i = 0; i < num_shards; i++) {
1798 char lock_name[32] = {0};
1799 snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
1800 char order_lock[32] = {0};
1801 snprintf(order_lock, sizeof(order_lock), "%s.%d",
1802 "OSD:ShardedOpWQ:order:", i);
1803 ShardData* one_shard = new ShardData(
1804 lock_name, order_lock,
1805 osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
1806 osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue);
1807 shard_list.push_back(one_shard);
1808 }
1809 }
1810 ~ShardedOpWQ() override {
1811 while (!shard_list.empty()) {
1812 delete shard_list.back();
1813 shard_list.pop_back();
1814 }
1815 }
1816
1817 /// wake any pg waiters after a PG is created/instantiated
1818 void wake_pg_waiters(spg_t pgid);
1819
1820 /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here
1821 void prune_pg_waiters(OSDMapRef osdmap, int whoami);
1822
1823 /// clear cached PGRef on pg deletion
1824 void clear_pg_pointer(spg_t pgid);
1825
1826 /// clear pg_slots on shutdown
1827 void clear_pg_slots();
1828
1829 /// try to do some work
1830 void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
1831
1832 /// enqueue a new item
1833 void _enqueue(pair <spg_t, PGQueueable> item) override;
1834
1835 /// requeue an old item (at the front of the line)
1836 void _enqueue_front(pair <spg_t, PGQueueable> item) override;
1837
1838 void return_waiting_threads() override {
1839 for(uint32_t i = 0; i < num_shards; i++) {
1840 ShardData* sdata = shard_list[i];
1841 assert (NULL != sdata);
1842 sdata->sdata_lock.Lock();
1843 sdata->sdata_cond.Signal();
1844 sdata->sdata_lock.Unlock();
1845 }
1846 }
1847
1848 void dump(Formatter *f) {
1849 for(uint32_t i = 0; i < num_shards; i++) {
1850 ShardData* sdata = shard_list[i];
1851 char lock_name[32] = {0};
1852 snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i);
1853 assert (NULL != sdata);
1854 sdata->sdata_op_ordering_lock.Lock();
1855 f->open_object_section(lock_name);
1856 sdata->pqueue->dump(f);
1857 f->close_section();
1858 sdata->sdata_op_ordering_lock.Unlock();
1859 }
1860 }
1861
1862 /// Must be called on ops queued back to front
1863 struct Pred {
1864 spg_t pgid;
1865 list<OpRequestRef> *out_ops;
1866 uint64_t reserved_pushes_to_free;
1867 Pred(spg_t pg, list<OpRequestRef> *out_ops = 0)
1868 : pgid(pg), out_ops(out_ops), reserved_pushes_to_free(0) {}
1869 void accumulate(const PGQueueable &op) {
1870 reserved_pushes_to_free += op.get_reserved_pushes();
1871 if (out_ops) {
1872 boost::optional<OpRequestRef> mop = op.maybe_get_op();
1873 if (mop)
1874 out_ops->push_front(*mop);
1875 }
1876 }
1877 bool operator()(const pair<spg_t, PGQueueable> &op) {
1878 if (op.first == pgid) {
1879 accumulate(op.second);
1880 return true;
1881 } else {
1882 return false;
1883 }
1884 }
1885 uint64_t get_reserved_pushes_to_free() const {
1886 return reserved_pushes_to_free;
1887 }
1888 };
1889
1890 bool is_shard_empty(uint32_t thread_index) override {
1891 uint32_t shard_index = thread_index % num_shards;
1892 ShardData* sdata = shard_list[shard_index];
1893 assert(NULL != sdata);
1894 Mutex::Locker l(sdata->sdata_op_ordering_lock);
1895 return sdata->pqueue->empty();
1896 }
1897 } op_shardedwq;
1898
1899
1900 void enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch);
1901 void dequeue_op(
1902 PGRef pg, OpRequestRef op,
1903 ThreadPool::TPHandle &handle);
1904
1905 // -- peering queue --
1906 struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
1907 list<PG*> peering_queue;
1908 OSD *osd;
1909 set<PG*> in_use;
1910 PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
1911 : ThreadPool::BatchWorkQueue<PG>(
1912 "OSD::PeeringWQ", ti, si, tp), osd(o) {}
1913
1914 void _dequeue(PG *pg) override {
1915 for (list<PG*>::iterator i = peering_queue.begin();
1916 i != peering_queue.end();
1917 ) {
1918 if (*i == pg) {
1919 peering_queue.erase(i++);
1920 pg->put("PeeringWQ");
1921 } else {
1922 ++i;
1923 }
1924 }
1925 }
1926 bool _enqueue(PG *pg) override {
1927 pg->get("PeeringWQ");
1928 peering_queue.push_back(pg);
1929 return true;
1930 }
1931 bool _empty() override {
1932 return peering_queue.empty();
1933 }
1934 void _dequeue(list<PG*> *out) override;
1935 void _process(
1936 const list<PG *> &pgs,
1937 ThreadPool::TPHandle &handle) override {
1938 assert(!pgs.empty());
1939 osd->process_peering_events(pgs, handle);
1940 for (list<PG *>::const_iterator i = pgs.begin();
1941 i != pgs.end();
1942 ++i) {
1943 (*i)->put("PeeringWQ");
1944 }
1945 }
1946 void _process_finish(const list<PG *> &pgs) override {
1947 for (list<PG*>::const_iterator i = pgs.begin();
1948 i != pgs.end();
1949 ++i) {
1950 in_use.erase(*i);
1951 }
1952 }
1953 void _clear() override {
1954 assert(peering_queue.empty());
1955 }
1956 } peering_wq;
1957
1958 void process_peering_events(
1959 const list<PG*> &pg,
1960 ThreadPool::TPHandle &handle);
1961
1962 friend class PG;
1963 friend class PrimaryLogPG;
1964
1965
1966 protected:
1967
1968 // -- osd map --
1969 OSDMapRef osdmap;
1970 OSDMapRef get_osdmap() {
1971 return osdmap;
1972 }
1973 epoch_t get_osdmap_epoch() {
1974 return osdmap ? osdmap->get_epoch() : 0;
1975 }
1976
1977 utime_t had_map_since;
1978 RWLock map_lock;
1979 list<OpRequestRef> waiting_for_osdmap;
1980 deque<utime_t> osd_markdown_log;
1981
1982 friend struct send_map_on_destruct;
1983
1984 void wait_for_new_map(OpRequestRef op);
1985 void handle_osd_map(class MOSDMap *m);
1986 void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
1987 void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
1988 void note_down_osd(int osd);
1989 void note_up_osd(int osd);
1990 friend class C_OnMapCommit;
1991
1992 bool advance_pg(
1993 epoch_t advance_to, PG *pg,
1994 ThreadPool::TPHandle &handle,
1995 PG::RecoveryCtx *rctx,
1996 set<boost::intrusive_ptr<PG> > *split_pgs
1997 );
1998 void consume_map();
1999 void activate_map();
2000
2001 // osd map cache (past osd maps)
2002 OSDMapRef get_map(epoch_t e) {
2003 return service.get_map(e);
2004 }
2005 OSDMapRef add_map(OSDMap *o) {
2006 return service.add_map(o);
2007 }
2008 void add_map_bl(epoch_t e, bufferlist& bl) {
2009 return service.add_map_bl(e, bl);
2010 }
2011 void pin_map_bl(epoch_t e, bufferlist &bl) {
2012 return service.pin_map_bl(e, bl);
2013 }
2014 bool get_map_bl(epoch_t e, bufferlist& bl) {
2015 return service.get_map_bl(e, bl);
2016 }
2017 void add_map_inc_bl(epoch_t e, bufferlist& bl) {
2018 return service.add_map_inc_bl(e, bl);
2019 }
2020 void pin_map_inc_bl(epoch_t e, bufferlist &bl) {
2021 return service.pin_map_inc_bl(e, bl);
2022 }
2023
2024protected:
2025 // -- placement groups --
2026 RWLock pg_map_lock; // this lock orders *above* individual PG _locks
2027 ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock
2028
2029 map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
2030 PGRecoveryStats pg_recovery_stats;
2031
2032 PGPool _get_pool(int id, OSDMapRef createmap);
2033
2034 PG *_lookup_lock_pg_with_map_lock_held(spg_t pgid);
2035 PG *_lookup_lock_pg(spg_t pgid);
2036 PG *_open_lock_pg(OSDMapRef createmap,
2037 spg_t pg, bool no_lockdep_check=false);
2038 enum res_result {
2039 RES_PARENT, // resurrected a parent
2040 RES_SELF, // resurrected self
2041 RES_NONE // nothing relevant deleting
2042 };
2043 res_result _try_resurrect_pg(
2044 OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state);
2045
2046 PG *_create_lock_pg(
2047 OSDMapRef createmap,
2048 spg_t pgid,
2049 bool hold_map_lock,
2050 bool backfill,
2051 int role,
2052 vector<int>& up, int up_primary,
2053 vector<int>& acting, int acting_primary,
2054 pg_history_t history,
2055 const PastIntervals& pi,
2056 ObjectStore::Transaction& t);
2057
2058 PG* _make_pg(OSDMapRef createmap, spg_t pgid);
2059 void add_newly_split_pg(PG *pg,
2060 PG::RecoveryCtx *rctx);
2061
2062 int handle_pg_peering_evt(
2063 spg_t pgid,
2064 const pg_history_t& orig_history,
2065 const PastIntervals& pi,
2066 epoch_t epoch,
2067 PG::CephPeeringEvtRef evt);
2068
2069 void load_pgs();
2070 void build_past_intervals_parallel();
2071
2072 /// build initial pg history and intervals on create
2073 void build_initial_pg_history(
2074 spg_t pgid,
2075 epoch_t created,
2076 utime_t created_stamp,
2077 pg_history_t *h,
2078 PastIntervals *pi);
2079
2080 /// project pg history from from to now
2081 bool project_pg_history(
2082 spg_t pgid, pg_history_t& h, epoch_t from,
2083 const vector<int>& lastup,
2084 int lastupprimary,
2085 const vector<int>& lastacting,
2086 int lastactingprimary
2087 ); ///< @return false if there was a map gap between from and now
2088
2089 // this must be called with pg->lock held on any pg addition to pg_map
2090 void wake_pg_waiters(PGRef pg) {
2091 assert(pg->is_locked());
2092 op_shardedwq.wake_pg_waiters(pg->info.pgid);
2093 }
2094 epoch_t last_pg_create_epoch;
2095
2096 void handle_pg_create(OpRequestRef op);
2097
2098 void split_pgs(
2099 PG *parent,
2100 const set<spg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs,
2101 OSDMapRef curmap,
2102 OSDMapRef nextmap,
2103 PG::RecoveryCtx *rctx);
2104
2105 // == monitor interaction ==
2106 Mutex mon_report_lock;
2107 utime_t last_mon_report;
2108 utime_t last_pg_stats_sent;
2109
2110 /* if our monitor dies, we want to notice it and reconnect.
2111 * So we keep track of when it last acked our stat updates,
2112 * and if too much time passes (and we've been sending
2113 * more updates) then we can call it dead and reconnect
2114 * elsewhere.
2115 */
2116 utime_t last_pg_stats_ack;
2117 float stats_ack_timeout;
2118 set<uint64_t> outstanding_pg_stats; // how many stat updates haven't been acked yet
2119
2120 // -- boot --
2121 void start_boot();
2122 void _got_mon_epochs(epoch_t oldest, epoch_t newest);
2123 void _preboot(epoch_t oldest, epoch_t newest);
2124 void _send_boot();
2125 void _collect_metadata(map<string,string> *pmeta);
2126
2127 void start_waiting_for_healthy();
2128 bool _is_healthy();
2129
2130 void send_full_update();
2131
2132 friend struct C_OSD_GetVersion;
2133
2134 // -- alive --
2135 epoch_t up_thru_wanted;
2136
2137 void queue_want_up_thru(epoch_t want);
2138 void send_alive();
2139
2140 // -- full map requests --
2141 epoch_t requested_full_first, requested_full_last;
2142
2143 void request_full_map(epoch_t first, epoch_t last);
2144 void rerequest_full_maps() {
2145 epoch_t first = requested_full_first;
2146 epoch_t last = requested_full_last;
2147 requested_full_first = 0;
2148 requested_full_last = 0;
2149 request_full_map(first, last);
2150 }
2151 void got_full_map(epoch_t e);
2152
2153 // -- failures --
2154 map<int,utime_t> failure_queue;
2155 map<int,pair<utime_t,entity_inst_t> > failure_pending;
2156
2157 void requeue_failures();
2158 void send_failures();
2159 void send_still_alive(epoch_t epoch, const entity_inst_t &i);
2160
2161 // -- pg stats --
2162 Mutex pg_stat_queue_lock;
2163 Cond pg_stat_queue_cond;
2164 xlist<PG*> pg_stat_queue;
2165 bool osd_stat_updated;
2166 uint64_t pg_stat_tid, pg_stat_tid_flushed;
2167
2168 void send_pg_stats(const utime_t &now);
2169 void handle_pg_stats_ack(class MPGStatsAck *ack);
2170 void flush_pg_stats();
2171
2172 ceph::coarse_mono_clock::time_point last_sent_beacon;
2173 Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
2174 epoch_t min_last_epoch_clean = 0;
2175 // which pgs were scanned for min_lec
2176 std::vector<pg_t> min_last_epoch_clean_pgs;
2177 void send_beacon(const ceph::coarse_mono_clock::time_point& now);
2178
2179 void pg_stat_queue_enqueue(PG *pg) {
2180 pg_stat_queue_lock.Lock();
2181 if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) {
2182 pg->get("pg_stat_queue");
2183 pg_stat_queue.push_back(&pg->stat_queue_item);
2184 }
2185 osd_stat_updated = true;
2186 pg_stat_queue_lock.Unlock();
2187 }
2188 void pg_stat_queue_dequeue(PG *pg) {
2189 pg_stat_queue_lock.Lock();
2190 if (pg->stat_queue_item.remove_myself())
2191 pg->put("pg_stat_queue");
2192 pg_stat_queue_lock.Unlock();
2193 }
2194 void clear_pg_stat_queue() {
2195 pg_stat_queue_lock.Lock();
2196 while (!pg_stat_queue.empty()) {
2197 PG *pg = pg_stat_queue.front();
2198 pg_stat_queue.pop_front();
2199 pg->put("pg_stat_queue");
2200 }
2201 pg_stat_queue_lock.Unlock();
2202 }
2203
2204 ceph_tid_t get_tid() {
2205 return service.get_tid();
2206 }
2207
2208 // -- generic pg peering --
2209 PG::RecoveryCtx create_context();
2210 void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
2211 ThreadPool::TPHandle *handle = NULL);
2212 void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
2213 ThreadPool::TPHandle *handle = NULL);
2214 void do_notifies(map<int,
2215 vector<pair<pg_notify_t, PastIntervals> > >&
2216 notify_list,
2217 OSDMapRef map);
2218 void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
2219 OSDMapRef map);
2220 void do_infos(map<int,
2221 vector<pair<pg_notify_t, PastIntervals> > >& info_map,
2222 OSDMapRef map);
2223
2224 bool require_mon_peer(const Message *m);
2225 bool require_mon_or_mgr_peer(const Message *m);
2226 bool require_osd_peer(const Message *m);
2227 /***
2228 * Verifies that we were alive in the given epoch, and that
2229 * still are.
2230 */
2231 bool require_self_aliveness(const Message *m, epoch_t alive_since);
2232 /**
2233 * Verifies that the OSD who sent the given op has the same
2234 * address as in the given map.
2235 * @pre op was sent by an OSD using the cluster messenger
2236 */
2237 bool require_same_peer_instance(const Message *m, OSDMapRef& map,
2238 bool is_fast_dispatch);
2239
2240 bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
2241 bool is_fast_dispatch);
2242
2243 void handle_pg_query(OpRequestRef op);
2244 void handle_pg_notify(OpRequestRef op);
2245 void handle_pg_log(OpRequestRef op);
2246 void handle_pg_info(OpRequestRef op);
2247 void handle_pg_trim(OpRequestRef op);
2248
2249 void handle_pg_backfill_reserve(OpRequestRef op);
2250 void handle_pg_recovery_reserve(OpRequestRef op);
2251
2252 void handle_pg_remove(OpRequestRef op);
2253 void _remove_pg(PG *pg);
2254
2255 // -- commands --
2256 struct Command {
2257 vector<string> cmd;
2258 ceph_tid_t tid;
2259 bufferlist indata;
2260 ConnectionRef con;
2261
2262 Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co)
2263 : cmd(c), tid(t), indata(bl), con(co) {}
2264 };
2265 list<Command*> command_queue;
2266 struct CommandWQ : public ThreadPool::WorkQueue<Command> {
2267 OSD *osd;
2268 CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
2269 : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {}
2270
2271 bool _empty() override {
2272 return osd->command_queue.empty();
2273 }
2274 bool _enqueue(Command *c) override {
2275 osd->command_queue.push_back(c);
2276 return true;
2277 }
2278 void _dequeue(Command *pg) override {
2279 ceph_abort();
2280 }
2281 Command *_dequeue() override {
2282 if (osd->command_queue.empty())
2283 return NULL;
2284 Command *c = osd->command_queue.front();
2285 osd->command_queue.pop_front();
2286 return c;
2287 }
2288 void _process(Command *c, ThreadPool::TPHandle &) override {
2289 osd->osd_lock.Lock();
2290 if (osd->is_stopping()) {
2291 osd->osd_lock.Unlock();
2292 delete c;
2293 return;
2294 }
2295 osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
2296 osd->osd_lock.Unlock();
2297 delete c;
2298 }
2299 void _clear() override {
2300 while (!osd->command_queue.empty()) {
2301 Command *c = osd->command_queue.front();
2302 osd->command_queue.pop_front();
2303 delete c;
2304 }
2305 }
2306 } command_wq;
2307
2308 void handle_command(class MMonCommand *m);
2309 void handle_command(class MCommand *m);
2310 void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
2311
2312 // -- pg recovery --
2313 void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
2314 ThreadPool::TPHandle &handle);
2315
2316
2317 // -- scrubbing --
2318 void sched_scrub();
2319 bool scrub_random_backoff();
2320 bool scrub_load_below_threshold();
2321 bool scrub_time_permit(utime_t now);
2322
2323 // -- removing --
2324 struct RemoveWQ :
2325 public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
2326 CephContext* cct;
2327 ObjectStore *&store;
2328 list<pair<PGRef, DeletingStateRef> > remove_queue;
2329 RemoveWQ(CephContext* cct, ObjectStore *&o, time_t ti, time_t si,
2330 ThreadPool *tp)
2331 : ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> >(
2332 "OSD::RemoveWQ", ti, si, tp), cct(cct), store(o) {}
2333
2334 bool _empty() override {
2335 return remove_queue.empty();
2336 }
2337 void _enqueue(pair<PGRef, DeletingStateRef> item) override {
2338 remove_queue.push_back(item);
2339 }
2340 void _enqueue_front(pair<PGRef, DeletingStateRef> item) override {
2341 remove_queue.push_front(item);
2342 }
2343 bool _dequeue(pair<PGRef, DeletingStateRef> item) {
2344 ceph_abort();
2345 }
2346 pair<PGRef, DeletingStateRef> _dequeue() override {
2347 assert(!remove_queue.empty());
2348 pair<PGRef, DeletingStateRef> item = remove_queue.front();
2349 remove_queue.pop_front();
2350 return item;
2351 }
2352 void _process(pair<PGRef, DeletingStateRef>,
2353 ThreadPool::TPHandle &) override;
2354 void _clear() override {
2355 remove_queue.clear();
2356 }
2357 } remove_wq;
2358
2359 private:
2360 bool ms_can_fast_dispatch_any() const override { return true; }
2361 bool ms_can_fast_dispatch(const Message *m) const override {
2362 switch (m->get_type()) {
2363 case CEPH_MSG_OSD_OP:
2364 case CEPH_MSG_OSD_BACKOFF:
2365 case MSG_OSD_SUBOP:
2366 case MSG_OSD_REPOP:
2367 case MSG_OSD_SUBOPREPLY:
2368 case MSG_OSD_REPOPREPLY:
2369 case MSG_OSD_PG_PUSH:
2370 case MSG_OSD_PG_PULL:
2371 case MSG_OSD_PG_PUSH_REPLY:
2372 case MSG_OSD_PG_SCAN:
2373 case MSG_OSD_PG_BACKFILL:
2374 case MSG_OSD_PG_BACKFILL_REMOVE:
2375 case MSG_OSD_EC_WRITE:
2376 case MSG_OSD_EC_WRITE_REPLY:
2377 case MSG_OSD_EC_READ:
2378 case MSG_OSD_EC_READ_REPLY:
2379 case MSG_OSD_SCRUB_RESERVE:
2380 case MSG_OSD_REP_SCRUB:
2381 case MSG_OSD_REP_SCRUBMAP:
2382 case MSG_OSD_PG_UPDATE_LOG_MISSING:
2383 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
2384 return true;
2385 default:
2386 return false;
2387 }
2388 }
2389 void ms_fast_dispatch(Message *m) override;
2390 void ms_fast_preprocess(Message *m) override;
2391 bool ms_dispatch(Message *m) override;
2392 bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) override;
2393 bool ms_verify_authorizer(Connection *con, int peer_type,
2394 int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
2395 bool& isvalid, CryptoKey& session_key) override;
2396 void ms_handle_connect(Connection *con) override;
2397 void ms_handle_fast_connect(Connection *con) override;
2398 void ms_handle_fast_accept(Connection *con) override;
2399 bool ms_handle_reset(Connection *con) override;
2400 void ms_handle_remote_reset(Connection *con) override {}
2401 bool ms_handle_refused(Connection *con) override;
2402
2403 io_queue get_io_queue() const {
2404 if (cct->_conf->osd_op_queue == "debug_random") {
2405 srand(time(NULL));
2406 return (rand() % 2 < 1) ? prioritized : weightedpriority;
2407 } else if (cct->_conf->osd_op_queue == "wpq") {
2408 return weightedpriority;
2409 } else {
2410 return prioritized;
2411 }
2412 }
2413
2414 unsigned int get_io_prio_cut() const {
2415 if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
2416 srand(time(NULL));
2417 return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
2418 } else if (cct->_conf->osd_op_queue_cut_off == "low") {
2419 return CEPH_MSG_PRIO_LOW;
2420 } else {
2421 return CEPH_MSG_PRIO_HIGH;
2422 }
2423 }
2424
2425 public:
2426 /* internal and external can point to the same messenger, they will still
2427 * be cleaned up properly*/
2428 OSD(CephContext *cct_,
2429 ObjectStore *store_,
2430 int id,
2431 Messenger *internal,
2432 Messenger *external,
2433 Messenger *hb_front_client,
2434 Messenger *hb_back_client,
2435 Messenger *hb_front_server,
2436 Messenger *hb_back_server,
2437 Messenger *osdc_messenger,
2438 MonClient *mc, const std::string &dev, const std::string &jdev);
2439 ~OSD() override;
2440
2441 // static bits
2442 static int mkfs(CephContext *cct, ObjectStore *store,
2443 const string& dev,
2444 uuid_d fsid, int whoami);
2445 /* remove any non-user xattrs from a map of them */
2446 void filter_xattrs(map<string, bufferptr>& attrs) {
2447 for (map<string, bufferptr>::iterator iter = attrs.begin();
2448 iter != attrs.end();
2449 ) {
2450 if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
2451 attrs.erase(iter++);
2452 else ++iter;
2453 }
2454 }
2455
2456private:
2457 int mon_cmd_maybe_osd_create(string &cmd);
2458 int update_crush_device_class();
2459 int update_crush_location();
2460
2461 static int write_meta(ObjectStore *store,
2462 uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami);
2463
2464 void handle_pg_scrub(struct MOSDScrub *m, PG* pg);
2465 void handle_scrub(struct MOSDScrub *m);
2466 void handle_osd_ping(class MOSDPing *m);
2467
2468 int init_op_flags(OpRequestRef& op);
2469
2470public:
2471 static int peek_meta(ObjectStore *store, string& magic,
2472 uuid_d& cluster_fsid, uuid_d& osd_fsid, int& whoami);
2473
2474
2475 // startup/shutdown
2476 int pre_init();
2477 int init();
2478 void final_init();
2479
2480 int enable_disable_fuse(bool stop);
2481
2482 void suicide(int exitcode);
2483 int shutdown();
2484
2485 void handle_signal(int signum);
2486
2487 /// check if we can throw out op from a disconnected client
2488 static bool op_is_discardable(const MOSDOp *m);
2489
2490public:
2491 OSDService service;
2492 friend class OSDService;
2493};
2494
2495//compatibility of the executable
2496extern const CompatSet::Feature ceph_osd_feature_compat[];
2497extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
2498extern const CompatSet::Feature ceph_osd_feature_incompat[];
2499
2500#endif