]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/OSD.h
12a9d774c75e08e061be8a6ab819db7704a91aee
[ceph.git] / ceph / src / osd / OSD.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_H
16 #define CEPH_OSD_H
17
18 #include "PG.h"
19
20 #include "msg/Dispatcher.h"
21
22 #include "common/Mutex.h"
23 #include "common/RWLock.h"
24 #include "common/Timer.h"
25 #include "common/WorkQueue.h"
26 #include "common/AsyncReserver.h"
27 #include "common/ceph_context.h"
28 #include "common/zipkin_trace.h"
29
30 #include "mgr/MgrClient.h"
31
32 #include "os/ObjectStore.h"
33 #include "OSDCap.h"
34
35 #include "auth/KeyRing.h"
36 #include "osd/ClassHandler.h"
37
38 #include "include/CompatSet.h"
39
40 #include "OpRequest.h"
41 #include "Session.h"
42
43 #include <atomic>
44 #include <map>
45 #include <memory>
46 #include "include/memory.h"
47 using namespace std;
48
49 #include "include/unordered_map.h"
50
51 #include "common/shared_cache.hpp"
52 #include "common/simple_cache.hpp"
53 #include "common/sharedptr_registry.hpp"
54 #include "common/WeightedPriorityQueue.h"
55 #include "common/PrioritizedQueue.h"
56 #include "messages/MOSDOp.h"
57 #include "include/Spinlock.h"
58 #include "common/EventTrace.h"
59
60 #define CEPH_OSD_PROTOCOL 10 /* cluster internal */
61
62
63 enum {
64 l_osd_first = 10000,
65 l_osd_op_wip,
66 l_osd_op,
67 l_osd_op_inb,
68 l_osd_op_outb,
69 l_osd_op_lat,
70 l_osd_op_process_lat,
71 l_osd_op_prepare_lat,
72 l_osd_op_r,
73 l_osd_op_r_outb,
74 l_osd_op_r_lat,
75 l_osd_op_r_lat_outb_hist,
76 l_osd_op_r_process_lat,
77 l_osd_op_r_prepare_lat,
78 l_osd_op_w,
79 l_osd_op_w_inb,
80 l_osd_op_w_lat,
81 l_osd_op_w_lat_inb_hist,
82 l_osd_op_w_process_lat,
83 l_osd_op_w_prepare_lat,
84 l_osd_op_rw,
85 l_osd_op_rw_inb,
86 l_osd_op_rw_outb,
87 l_osd_op_rw_lat,
88 l_osd_op_rw_lat_inb_hist,
89 l_osd_op_rw_lat_outb_hist,
90 l_osd_op_rw_process_lat,
91 l_osd_op_rw_prepare_lat,
92
93 l_osd_sop,
94 l_osd_sop_inb,
95 l_osd_sop_lat,
96 l_osd_sop_w,
97 l_osd_sop_w_inb,
98 l_osd_sop_w_lat,
99 l_osd_sop_pull,
100 l_osd_sop_pull_lat,
101 l_osd_sop_push,
102 l_osd_sop_push_inb,
103 l_osd_sop_push_lat,
104
105 l_osd_pull,
106 l_osd_push,
107 l_osd_push_outb,
108
109 l_osd_rop,
110
111 l_osd_loadavg,
112 l_osd_buf,
113 l_osd_history_alloc_bytes,
114 l_osd_history_alloc_num,
115 l_osd_cached_crc,
116 l_osd_cached_crc_adjusted,
117 l_osd_missed_crc,
118
119 l_osd_pg,
120 l_osd_pg_primary,
121 l_osd_pg_replica,
122 l_osd_pg_stray,
123 l_osd_hb_to,
124 l_osd_map,
125 l_osd_mape,
126 l_osd_mape_dup,
127
128 l_osd_waiting_for_map,
129
130 l_osd_map_cache_hit,
131 l_osd_map_cache_miss,
132 l_osd_map_cache_miss_low,
133 l_osd_map_cache_miss_low_avg,
134 l_osd_map_bl_cache_hit,
135 l_osd_map_bl_cache_miss,
136
137 l_osd_stat_bytes,
138 l_osd_stat_bytes_used,
139 l_osd_stat_bytes_avail,
140
141 l_osd_copyfrom,
142
143 l_osd_tier_promote,
144 l_osd_tier_flush,
145 l_osd_tier_flush_fail,
146 l_osd_tier_try_flush,
147 l_osd_tier_try_flush_fail,
148 l_osd_tier_evict,
149 l_osd_tier_whiteout,
150 l_osd_tier_dirty,
151 l_osd_tier_clean,
152 l_osd_tier_delay,
153 l_osd_tier_proxy_read,
154 l_osd_tier_proxy_write,
155
156 l_osd_agent_wake,
157 l_osd_agent_skip,
158 l_osd_agent_flush,
159 l_osd_agent_evict,
160
161 l_osd_object_ctx_cache_hit,
162 l_osd_object_ctx_cache_total,
163
164 l_osd_op_cache_hit,
165 l_osd_tier_flush_lat,
166 l_osd_tier_promote_lat,
167 l_osd_tier_r_lat,
168
169 l_osd_pg_info,
170 l_osd_pg_fastinfo,
171 l_osd_pg_biginfo,
172
173 l_osd_last,
174 };
175
176 // RecoveryState perf counters
177 enum {
178 rs_first = 20000,
179 rs_initial_latency,
180 rs_started_latency,
181 rs_reset_latency,
182 rs_start_latency,
183 rs_primary_latency,
184 rs_peering_latency,
185 rs_backfilling_latency,
186 rs_waitremotebackfillreserved_latency,
187 rs_waitlocalbackfillreserved_latency,
188 rs_notbackfilling_latency,
189 rs_repnotrecovering_latency,
190 rs_repwaitrecoveryreserved_latency,
191 rs_repwaitbackfillreserved_latency,
192 rs_reprecovering_latency,
193 rs_activating_latency,
194 rs_waitlocalrecoveryreserved_latency,
195 rs_waitremoterecoveryreserved_latency,
196 rs_recovering_latency,
197 rs_recovered_latency,
198 rs_clean_latency,
199 rs_active_latency,
200 rs_replicaactive_latency,
201 rs_stray_latency,
202 rs_getinfo_latency,
203 rs_getlog_latency,
204 rs_waitactingchange_latency,
205 rs_incomplete_latency,
206 rs_down_latency,
207 rs_getmissing_latency,
208 rs_waitupthru_latency,
209 rs_notrecovering_latency,
210 rs_last,
211 };
212
213 class Messenger;
214 class Message;
215 class MonClient;
216 class PerfCounters;
217 class ObjectStore;
218 class FuseStore;
219 class OSDMap;
220 class MLog;
221 class Objecter;
222
223 class Watch;
224 class PrimaryLogPG;
225
226 class AuthAuthorizeHandlerRegistry;
227
228 class TestOpsSocketHook;
229 struct C_CompleteSplits;
230 struct C_OpenPGs;
231 class LogChannel;
232 class CephContext;
233 typedef ceph::shared_ptr<ObjectStore::Sequencer> SequencerRef;
234 class MOSDOp;
235
236 class DeletingState {
237 Mutex lock;
238 Cond cond;
239 enum {
240 QUEUED,
241 CLEARING_DIR,
242 CLEARING_WAITING,
243 DELETING_DIR,
244 DELETED_DIR,
245 CANCELED,
246 } status;
247 bool stop_deleting;
248 public:
249 const spg_t pgid;
250 const PGRef old_pg_state;
251 explicit DeletingState(const pair<spg_t, PGRef> &in) :
252 lock("DeletingState::lock"), status(QUEUED), stop_deleting(false),
253 pgid(in.first), old_pg_state(in.second) {
254 }
255
256 /// transition status to CLEARING_WAITING
257 bool pause_clearing() {
258 Mutex::Locker l(lock);
259 assert(status == CLEARING_DIR);
260 if (stop_deleting) {
261 status = CANCELED;
262 cond.Signal();
263 return false;
264 }
265 status = CLEARING_WAITING;
266 return true;
267 } ///< @return false if we should cancel deletion
268
269 /// start or resume the clearing - transition the status to CLEARING_DIR
270 bool start_or_resume_clearing() {
271 Mutex::Locker l(lock);
272 assert(
273 status == QUEUED ||
274 status == DELETED_DIR ||
275 status == CLEARING_WAITING);
276 if (stop_deleting) {
277 status = CANCELED;
278 cond.Signal();
279 return false;
280 }
281 status = CLEARING_DIR;
282 return true;
283 } ///< @return false if we should cancel the deletion
284
285 /// transition status to CLEARING_DIR
286 bool resume_clearing() {
287 Mutex::Locker l(lock);
288 assert(status == CLEARING_WAITING);
289 if (stop_deleting) {
290 status = CANCELED;
291 cond.Signal();
292 return false;
293 }
294 status = CLEARING_DIR;
295 return true;
296 } ///< @return false if we should cancel deletion
297
298 /// transition status to deleting
299 bool start_deleting() {
300 Mutex::Locker l(lock);
301 assert(status == CLEARING_DIR);
302 if (stop_deleting) {
303 status = CANCELED;
304 cond.Signal();
305 return false;
306 }
307 status = DELETING_DIR;
308 return true;
309 } ///< @return false if we should cancel deletion
310
311 /// signal collection removal queued
312 void finish_deleting() {
313 Mutex::Locker l(lock);
314 assert(status == DELETING_DIR);
315 status = DELETED_DIR;
316 cond.Signal();
317 }
318
319 /// try to halt the deletion
320 bool try_stop_deletion() {
321 Mutex::Locker l(lock);
322 stop_deleting = true;
323 /**
324 * If we are in DELETING_DIR or CLEARING_DIR, there are in progress
325 * operations we have to wait for before continuing on. States
326 * CLEARING_WAITING and QUEUED indicate that the remover will check
327 * stop_deleting before queueing any further operations. CANCELED
328 * indicates that the remover has already halted. DELETED_DIR
329 * indicates that the deletion has been fully queued.
330 */
331 while (status == DELETING_DIR || status == CLEARING_DIR)
332 cond.Wait(lock);
333 return status != DELETED_DIR;
334 } ///< @return true if we don't need to recreate the collection
335 };
336 typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
337
338 class OSD;
339
340 struct PGScrub {
341 epoch_t epoch_queued;
342 explicit PGScrub(epoch_t e) : epoch_queued(e) {}
343 ostream &operator<<(ostream &rhs) {
344 return rhs << "PGScrub";
345 }
346 };
347
348 struct PGSnapTrim {
349 epoch_t epoch_queued;
350 explicit PGSnapTrim(epoch_t e) : epoch_queued(e) {}
351 ostream &operator<<(ostream &rhs) {
352 return rhs << "PGSnapTrim";
353 }
354 };
355
356 struct PGRecovery {
357 epoch_t epoch_queued;
358 uint64_t reserved_pushes;
359 PGRecovery(epoch_t e, uint64_t reserved_pushes)
360 : epoch_queued(e), reserved_pushes(reserved_pushes) {}
361 ostream &operator<<(ostream &rhs) {
362 return rhs << "PGRecovery(epoch=" << epoch_queued
363 << ", reserved_pushes: " << reserved_pushes << ")";
364 }
365 };
366
367 class PGQueueable {
368 typedef boost::variant<
369 OpRequestRef,
370 PGSnapTrim,
371 PGScrub,
372 PGRecovery
373 > QVariant;
374 QVariant qvariant;
375 int cost;
376 unsigned priority;
377 utime_t start_time;
378 entity_inst_t owner;
379 epoch_t map_epoch; ///< an epoch we expect the PG to exist in
380
381 struct RunVis : public boost::static_visitor<> {
382 OSD *osd;
383 PGRef &pg;
384 ThreadPool::TPHandle &handle;
385 RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle)
386 : osd(osd), pg(pg), handle(handle) {}
387 void operator()(const OpRequestRef &op);
388 void operator()(const PGSnapTrim &op);
389 void operator()(const PGScrub &op);
390 void operator()(const PGRecovery &op);
391 };
392
393 struct StringifyVis : public boost::static_visitor<std::string> {
394 std::string operator()(const OpRequestRef &op) {
395 return stringify(op);
396 }
397 std::string operator()(const PGSnapTrim &op) {
398 return "PGSnapTrim";
399 }
400 std::string operator()(const PGScrub &op) {
401 return "PGScrub";
402 }
403 std::string operator()(const PGRecovery &op) {
404 return "PGRecovery";
405 }
406 };
407 friend ostream& operator<<(ostream& out, const PGQueueable& q) {
408 StringifyVis v;
409 return out << "PGQueueable(" << boost::apply_visitor(v, q.qvariant)
410 << " prio " << q.priority << " cost " << q.cost
411 << " e" << q.map_epoch << ")";
412 }
413
414 public:
415 // cppcheck-suppress noExplicitConstructor
416 PGQueueable(OpRequestRef op, epoch_t e)
417 : qvariant(op), cost(op->get_req()->get_cost()),
418 priority(op->get_req()->get_priority()),
419 start_time(op->get_req()->get_recv_stamp()),
420 owner(op->get_req()->get_source_inst()),
421 map_epoch(e)
422 {}
423 PGQueueable(
424 const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time,
425 const entity_inst_t &owner, epoch_t e)
426 : qvariant(op), cost(cost), priority(priority), start_time(start_time),
427 owner(owner), map_epoch(e) {}
428 PGQueueable(
429 const PGScrub &op, int cost, unsigned priority, utime_t start_time,
430 const entity_inst_t &owner, epoch_t e)
431 : qvariant(op), cost(cost), priority(priority), start_time(start_time),
432 owner(owner), map_epoch(e) {}
433 PGQueueable(
434 const PGRecovery &op, int cost, unsigned priority, utime_t start_time,
435 const entity_inst_t &owner, epoch_t e)
436 : qvariant(op), cost(cost), priority(priority), start_time(start_time),
437 owner(owner), map_epoch(e) {}
438 const boost::optional<OpRequestRef> maybe_get_op() const {
439 const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
440 return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>();
441 }
442 uint64_t get_reserved_pushes() const {
443 const PGRecovery *op = boost::get<PGRecovery>(&qvariant);
444 return op ? op->reserved_pushes : 0;
445 }
446 void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
447 RunVis v(osd, pg, handle);
448 boost::apply_visitor(v, qvariant);
449 }
450 unsigned get_priority() const { return priority; }
451 int get_cost() const { return cost; }
452 utime_t get_start_time() const { return start_time; }
453 entity_inst_t get_owner() const { return owner; }
454 epoch_t get_map_epoch() const { return map_epoch; }
455 };
456
457 class OSDService {
458 public:
459 OSD *osd;
460 CephContext *cct;
461 SharedPtrRegistry<spg_t, ObjectStore::Sequencer> osr_registry;
462 ceph::shared_ptr<ObjectStore::Sequencer> meta_osr;
463 SharedPtrRegistry<spg_t, DeletingState> deleting_pgs;
464 const int whoami;
465 ObjectStore *&store;
466 LogClient &log_client;
467 LogChannelRef clog;
468 PGRecoveryStats &pg_recovery_stats;
469 private:
470 Messenger *&cluster_messenger;
471 Messenger *&client_messenger;
472 public:
473 PerfCounters *&logger;
474 PerfCounters *&recoverystate_perf;
475 MonClient *&monc;
476 ThreadPool::BatchWorkQueue<PG> &peering_wq;
477 GenContextWQ recovery_gen_wq;
478 ClassHandler *&class_handler;
479
480 void enqueue_back(spg_t pgid, PGQueueable qi);
481 void enqueue_front(spg_t pgid, PGQueueable qi);
482
483 void maybe_inject_dispatch_delay() {
484 if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) {
485 if (rand() % 10000 <
486 g_conf->osd_debug_inject_dispatch_delay_probability * 10000) {
487 utime_t t;
488 t.set_from_double(g_conf->osd_debug_inject_dispatch_delay_duration);
489 t.sleep();
490 }
491 }
492 }
493
494 private:
495 // -- map epoch lower bound --
496 Mutex pg_epoch_lock;
497 multiset<epoch_t> pg_epochs;
498 map<spg_t,epoch_t> pg_epoch;
499
500 public:
501 void pg_add_epoch(spg_t pgid, epoch_t epoch) {
502 Mutex::Locker l(pg_epoch_lock);
503 map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
504 assert(t == pg_epoch.end());
505 pg_epoch[pgid] = epoch;
506 pg_epochs.insert(epoch);
507 }
508 void pg_update_epoch(spg_t pgid, epoch_t epoch) {
509 Mutex::Locker l(pg_epoch_lock);
510 map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
511 assert(t != pg_epoch.end());
512 pg_epochs.erase(pg_epochs.find(t->second));
513 t->second = epoch;
514 pg_epochs.insert(epoch);
515 }
516 void pg_remove_epoch(spg_t pgid) {
517 Mutex::Locker l(pg_epoch_lock);
518 map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
519 if (t != pg_epoch.end()) {
520 pg_epochs.erase(pg_epochs.find(t->second));
521 pg_epoch.erase(t);
522 }
523 }
524 epoch_t get_min_pg_epoch() {
525 Mutex::Locker l(pg_epoch_lock);
526 if (pg_epochs.empty())
527 return 0;
528 else
529 return *pg_epochs.begin();
530 }
531
532 private:
533 // -- superblock --
534 Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
535 OSDSuperblock superblock;
536
537 public:
538 OSDSuperblock get_superblock() {
539 Mutex::Locker l(publish_lock);
540 return superblock;
541 }
542 void publish_superblock(const OSDSuperblock &block) {
543 Mutex::Locker l(publish_lock);
544 superblock = block;
545 }
546
547 int get_nodeid() const { return whoami; }
548
549 std::atomic<epoch_t> max_oldest_map;
550 private:
551 OSDMapRef osdmap;
552
553 public:
554 OSDMapRef get_osdmap() {
555 Mutex::Locker l(publish_lock);
556 return osdmap;
557 }
558 epoch_t get_osdmap_epoch() {
559 Mutex::Locker l(publish_lock);
560 return osdmap ? osdmap->get_epoch() : 0;
561 }
562 void publish_map(OSDMapRef map) {
563 Mutex::Locker l(publish_lock);
564 osdmap = map;
565 }
566
567 /*
568 * osdmap - current published map
569 * next_osdmap - pre_published map that is about to be published.
570 *
571 * We use the next_osdmap to send messages and initiate connections,
572 * but only if the target is the same instance as the one in the map
573 * epoch the current user is working from (i.e., the result is
574 * equivalent to what is in next_osdmap).
575 *
576 * This allows the helpers to start ignoring osds that are about to
577 * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
578 * down, without worrying about reopening connections from threads
579 * working from old maps.
580 */
581 private:
582 OSDMapRef next_osdmap;
583 Cond pre_publish_cond;
584
585 public:
586 void pre_publish_map(OSDMapRef map) {
587 Mutex::Locker l(pre_publish_lock);
588 next_osdmap = std::move(map);
589 }
590
591 void activate_map();
592 /// map epochs reserved below
593 map<epoch_t, unsigned> map_reservations;
594
595 /// gets ref to next_osdmap and registers the epoch as reserved
596 OSDMapRef get_nextmap_reserved() {
597 Mutex::Locker l(pre_publish_lock);
598 if (!next_osdmap)
599 return OSDMapRef();
600 epoch_t e = next_osdmap->get_epoch();
601 map<epoch_t, unsigned>::iterator i =
602 map_reservations.insert(make_pair(e, 0)).first;
603 i->second++;
604 return next_osdmap;
605 }
606 /// releases reservation on map
607 void release_map(OSDMapRef osdmap) {
608 Mutex::Locker l(pre_publish_lock);
609 map<epoch_t, unsigned>::iterator i =
610 map_reservations.find(osdmap->get_epoch());
611 assert(i != map_reservations.end());
612 assert(i->second > 0);
613 if (--(i->second) == 0) {
614 map_reservations.erase(i);
615 }
616 pre_publish_cond.Signal();
617 }
618 /// blocks until there are no reserved maps prior to next_osdmap
619 void await_reserved_maps() {
620 Mutex::Locker l(pre_publish_lock);
621 assert(next_osdmap);
622 while (true) {
623 map<epoch_t, unsigned>::const_iterator i = map_reservations.cbegin();
624 if (i == map_reservations.cend() || i->first >= next_osdmap->get_epoch()) {
625 break;
626 } else {
627 pre_publish_cond.Wait(pre_publish_lock);
628 }
629 }
630 }
631
632 private:
633 Mutex peer_map_epoch_lock;
634 map<int, epoch_t> peer_map_epoch;
635 public:
636 epoch_t get_peer_epoch(int p);
637 epoch_t note_peer_epoch(int p, epoch_t e);
638 void forget_peer_epoch(int p, epoch_t e);
639
640 void send_map(class MOSDMap *m, Connection *con);
641 void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
642 MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
643 OSDSuperblock& superblock);
644 bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
645 const OSDMapRef& osdmap, const epoch_t *sent_epoch_p);
646 void share_map(entity_name_t name, Connection *con, epoch_t epoch,
647 OSDMapRef& osdmap, epoch_t *sent_epoch_p);
648 void share_map_peer(int peer, Connection *con,
649 OSDMapRef map = OSDMapRef());
650
651 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
652 pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
653 void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
654 void send_message_osd_cluster(Message *m, Connection *con) {
655 con->send_message(m);
656 }
657 void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
658 con->send_message(m);
659 }
660 void send_message_osd_client(Message *m, Connection *con) {
661 con->send_message(m);
662 }
663 void send_message_osd_client(Message *m, const ConnectionRef& con) {
664 con->send_message(m);
665 }
666 entity_name_t get_cluster_msgr_name() {
667 return cluster_messenger->get_myname();
668 }
669
670 private:
671 // -- scrub scheduling --
672 Mutex sched_scrub_lock;
673 int scrubs_pending;
674 int scrubs_active;
675
676 public:
677 struct ScrubJob {
678 CephContext* cct;
679 /// pg to be scrubbed
680 spg_t pgid;
681 /// a time scheduled for scrub. but the scrub could be delayed if system
682 /// load is too high or it fails to fall in the scrub hours
683 utime_t sched_time;
684 /// the hard upper bound of scrub time
685 utime_t deadline;
686 ScrubJob() : cct(nullptr) {}
687 explicit ScrubJob(CephContext* cct, const spg_t& pg,
688 const utime_t& timestamp,
689 double pool_scrub_min_interval = 0,
690 double pool_scrub_max_interval = 0, bool must = true);
691 /// order the jobs by sched_time
692 bool operator<(const ScrubJob& rhs) const;
693 };
694 set<ScrubJob> sched_scrub_pg;
695
696 /// @returns the scrub_reg_stamp used for unregister the scrub job
697 utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval,
698 double pool_scrub_max_interval, bool must) {
699 ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
700 must);
701 Mutex::Locker l(sched_scrub_lock);
702 sched_scrub_pg.insert(scrub);
703 return scrub.sched_time;
704 }
705 void unreg_pg_scrub(spg_t pgid, utime_t t) {
706 Mutex::Locker l(sched_scrub_lock);
707 size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
708 assert(removed);
709 }
710 bool first_scrub_stamp(ScrubJob *out) {
711 Mutex::Locker l(sched_scrub_lock);
712 if (sched_scrub_pg.empty())
713 return false;
714 set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
715 *out = *iter;
716 return true;
717 }
718 bool next_scrub_stamp(const ScrubJob& next,
719 ScrubJob *out) {
720 Mutex::Locker l(sched_scrub_lock);
721 if (sched_scrub_pg.empty())
722 return false;
723 set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next);
724 if (iter == sched_scrub_pg.cend())
725 return false;
726 ++iter;
727 if (iter == sched_scrub_pg.cend())
728 return false;
729 *out = *iter;
730 return true;
731 }
732
733 void dumps_scrub(Formatter *f) {
734 assert(f != nullptr);
735 Mutex::Locker l(sched_scrub_lock);
736
737 f->open_array_section("scrubs");
738 for (const auto &i: sched_scrub_pg) {
739 f->open_object_section("scrub");
740 f->dump_stream("pgid") << i.pgid;
741 f->dump_stream("sched_time") << i.sched_time;
742 f->dump_stream("deadline") << i.deadline;
743 f->dump_bool("forced", i.sched_time == i.deadline);
744 f->close_section();
745 }
746 f->close_section();
747 }
748
749 bool can_inc_scrubs_pending();
750 bool inc_scrubs_pending();
751 void inc_scrubs_active(bool reserved);
752 void dec_scrubs_pending();
753 void dec_scrubs_active();
754
755 void reply_op_error(OpRequestRef op, int err);
756 void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
757 void handle_misdirected_op(PG *pg, OpRequestRef op);
758
759
760 private:
761 // -- agent shared state --
762 Mutex agent_lock;
763 Cond agent_cond;
764 map<uint64_t, set<PGRef> > agent_queue;
765 set<PGRef>::iterator agent_queue_pos;
766 bool agent_valid_iterator;
767 int agent_ops;
768 int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
769 set<hobject_t> agent_oids;
770 bool agent_active;
771 struct AgentThread : public Thread {
772 OSDService *osd;
773 explicit AgentThread(OSDService *o) : osd(o) {}
774 void *entry() override {
775 osd->agent_entry();
776 return NULL;
777 }
778 } agent_thread;
779 bool agent_stop_flag;
780 Mutex agent_timer_lock;
781 SafeTimer agent_timer;
782
783 public:
784 void agent_entry();
785 void agent_stop();
786
787 void _enqueue(PG *pg, uint64_t priority) {
788 if (!agent_queue.empty() &&
789 agent_queue.rbegin()->first < priority)
790 agent_valid_iterator = false; // inserting higher-priority queue
791 set<PGRef>& nq = agent_queue[priority];
792 if (nq.empty())
793 agent_cond.Signal();
794 nq.insert(pg);
795 }
796
797 void _dequeue(PG *pg, uint64_t old_priority) {
798 set<PGRef>& oq = agent_queue[old_priority];
799 set<PGRef>::iterator p = oq.find(pg);
800 assert(p != oq.end());
801 if (p == agent_queue_pos)
802 ++agent_queue_pos;
803 oq.erase(p);
804 if (oq.empty()) {
805 if (agent_queue.rbegin()->first == old_priority)
806 agent_valid_iterator = false;
807 agent_queue.erase(old_priority);
808 }
809 }
810
811 /// enable agent for a pg
812 void agent_enable_pg(PG *pg, uint64_t priority) {
813 Mutex::Locker l(agent_lock);
814 _enqueue(pg, priority);
815 }
816
817 /// adjust priority for an enagled pg
818 void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
819 Mutex::Locker l(agent_lock);
820 assert(new_priority != old_priority);
821 _enqueue(pg, new_priority);
822 _dequeue(pg, old_priority);
823 }
824
825 /// disable agent for a pg
826 void agent_disable_pg(PG *pg, uint64_t old_priority) {
827 Mutex::Locker l(agent_lock);
828 _dequeue(pg, old_priority);
829 }
830
831 /// note start of an async (evict) op
832 void agent_start_evict_op() {
833 Mutex::Locker l(agent_lock);
834 ++agent_ops;
835 }
836
837 /// note finish or cancellation of an async (evict) op
838 void agent_finish_evict_op() {
839 Mutex::Locker l(agent_lock);
840 assert(agent_ops > 0);
841 --agent_ops;
842 agent_cond.Signal();
843 }
844
845 /// note start of an async (flush) op
846 void agent_start_op(const hobject_t& oid) {
847 Mutex::Locker l(agent_lock);
848 ++agent_ops;
849 assert(agent_oids.count(oid) == 0);
850 agent_oids.insert(oid);
851 }
852
853 /// note finish or cancellation of an async (flush) op
854 void agent_finish_op(const hobject_t& oid) {
855 Mutex::Locker l(agent_lock);
856 assert(agent_ops > 0);
857 --agent_ops;
858 assert(agent_oids.count(oid) == 1);
859 agent_oids.erase(oid);
860 agent_cond.Signal();
861 }
862
863 /// check if we are operating on an object
864 bool agent_is_active_oid(const hobject_t& oid) {
865 Mutex::Locker l(agent_lock);
866 return agent_oids.count(oid);
867 }
868
869 /// get count of active agent ops
870 int agent_get_num_ops() {
871 Mutex::Locker l(agent_lock);
872 return agent_ops;
873 }
874
875 void agent_inc_high_count() {
876 Mutex::Locker l(agent_lock);
877 flush_mode_high_count ++;
878 }
879
880 void agent_dec_high_count() {
881 Mutex::Locker l(agent_lock);
882 flush_mode_high_count --;
883 }
884
885 private:
886 /// throttle promotion attempts
887 std::atomic_uint promote_probability_millis{1000}; ///< probability thousands. one word.
888 PromoteCounter promote_counter;
889 utime_t last_recalibrate;
890 unsigned long promote_max_objects, promote_max_bytes;
891
892 public:
893 bool promote_throttle() {
894 // NOTE: lockless! we rely on the probability being a single word.
895 promote_counter.attempt();
896 if ((unsigned)rand() % 1000 > promote_probability_millis)
897 return true; // yes throttle (no promote)
898 if (promote_max_objects &&
899 promote_counter.objects > promote_max_objects)
900 return true; // yes throttle
901 if (promote_max_bytes &&
902 promote_counter.bytes > promote_max_bytes)
903 return true; // yes throttle
904 return false; // no throttle (promote)
905 }
906 void promote_finish(uint64_t bytes) {
907 promote_counter.finish(bytes);
908 }
909 void promote_throttle_recalibrate();
910
911 // -- Objecter, for tiering reads/writes from/to other OSDs --
912 Objecter *objecter;
913 Finisher objecter_finisher;
914
915 // -- Watch --
916 Mutex watch_lock;
917 SafeTimer watch_timer;
918 uint64_t next_notif_id;
919 uint64_t get_next_id(epoch_t cur_epoch) {
920 Mutex::Locker l(watch_lock);
921 return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
922 }
923
924 // -- Recovery/Backfill Request Scheduling --
925 Mutex recovery_request_lock;
926 SafeTimer recovery_request_timer;
927
928 // For async recovery sleep
929 bool recovery_needs_sleep = true;
930 utime_t recovery_schedule_time = utime_t();
931
932 Mutex recovery_sleep_lock;
933 SafeTimer recovery_sleep_timer;
934
935 // -- tids --
936 // for ops i issue
937 std::atomic_uint last_tid{0};
938 ceph_tid_t get_tid() {
939 return (ceph_tid_t)last_tid++;
940 }
941
942 // -- backfill_reservation --
943 Finisher reserver_finisher;
944 AsyncReserver<spg_t> local_reserver;
945 AsyncReserver<spg_t> remote_reserver;
946
947 // -- pg_temp --
948 private:
949 Mutex pg_temp_lock;
950 map<pg_t, vector<int> > pg_temp_wanted;
951 map<pg_t, vector<int> > pg_temp_pending;
952 void _sent_pg_temp();
953 public:
954 void queue_want_pg_temp(pg_t pgid, vector<int>& want);
955 void remove_want_pg_temp(pg_t pgid);
956 void requeue_pg_temp();
957 void send_pg_temp();
958
959 void send_pg_created(pg_t pgid);
960
961 void queue_for_peering(PG *pg);
962
963 Mutex snap_sleep_lock;
964 SafeTimer snap_sleep_timer;
965
966 Mutex scrub_sleep_lock;
967 SafeTimer scrub_sleep_timer;
968
969 AsyncReserver<spg_t> snap_reserver;
970 void queue_for_snap_trim(PG *pg);
971
972 void queue_for_scrub(PG *pg, bool with_high_priority) {
973 unsigned scrub_queue_priority = pg->scrubber.priority;
974 if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
975 scrub_queue_priority = cct->_conf->osd_client_op_priority;
976 }
977 enqueue_back(
978 pg->info.pgid,
979 PGQueueable(
980 PGScrub(pg->get_osdmap()->get_epoch()),
981 cct->_conf->osd_scrub_cost,
982 scrub_queue_priority,
983 ceph_clock_now(),
984 entity_inst_t(),
985 pg->get_osdmap()->get_epoch()));
986 }
987
988 private:
989 // -- pg recovery and associated throttling --
990 Mutex recovery_lock;
991 list<pair<epoch_t, PGRef> > awaiting_throttle;
992
993 utime_t defer_recovery_until;
994 uint64_t recovery_ops_active;
995 uint64_t recovery_ops_reserved;
996 bool recovery_paused;
997 #ifdef DEBUG_RECOVERY_OIDS
998 map<spg_t, set<hobject_t> > recovery_oids;
999 #endif
1000 bool _recover_now(uint64_t *available_pushes);
1001 void _maybe_queue_recovery();
1002 void _queue_for_recovery(
1003 pair<epoch_t, PGRef> p, uint64_t reserved_pushes) {
1004 assert(recovery_lock.is_locked_by_me());
1005 enqueue_back(
1006 p.second->info.pgid,
1007 PGQueueable(
1008 PGRecovery(p.first, reserved_pushes),
1009 cct->_conf->osd_recovery_cost,
1010 cct->_conf->osd_recovery_priority,
1011 ceph_clock_now(),
1012 entity_inst_t(),
1013 p.first));
1014 }
1015 public:
1016 void start_recovery_op(PG *pg, const hobject_t& soid);
1017 void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
1018 bool is_recovery_active();
1019 void release_reserved_pushes(uint64_t pushes) {
1020 Mutex::Locker l(recovery_lock);
1021 assert(recovery_ops_reserved >= pushes);
1022 recovery_ops_reserved -= pushes;
1023 _maybe_queue_recovery();
1024 }
1025 void defer_recovery(float defer_for) {
1026 defer_recovery_until = ceph_clock_now();
1027 defer_recovery_until += defer_for;
1028 }
1029 void pause_recovery() {
1030 Mutex::Locker l(recovery_lock);
1031 recovery_paused = true;
1032 }
1033 bool recovery_is_paused() {
1034 Mutex::Locker l(recovery_lock);
1035 return recovery_paused;
1036 }
1037 void unpause_recovery() {
1038 Mutex::Locker l(recovery_lock);
1039 recovery_paused = false;
1040 _maybe_queue_recovery();
1041 }
1042 void kick_recovery_queue() {
1043 Mutex::Locker l(recovery_lock);
1044 _maybe_queue_recovery();
1045 }
1046 void clear_queued_recovery(PG *pg) {
1047 Mutex::Locker l(recovery_lock);
1048 for (list<pair<epoch_t, PGRef> >::iterator i = awaiting_throttle.begin();
1049 i != awaiting_throttle.end();
1050 ) {
1051 if (i->second.get() == pg) {
1052 awaiting_throttle.erase(i);
1053 return;
1054 } else {
1055 ++i;
1056 }
1057 }
1058 }
1059 // delayed pg activation
1060 void queue_for_recovery(PG *pg, bool front = false) {
1061 Mutex::Locker l(recovery_lock);
1062 if (front) {
1063 awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
1064 } else {
1065 awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
1066 }
1067 _maybe_queue_recovery();
1068 }
1069 void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
1070 Mutex::Locker l(recovery_lock);
1071 _queue_for_recovery(make_pair(queued, pg), reserved_pushes);
1072 }
1073
1074
1075 // osd map cache (past osd maps)
1076 Mutex map_cache_lock;
1077 SharedLRU<epoch_t, const OSDMap> map_cache;
1078 SimpleLRU<epoch_t, bufferlist> map_bl_cache;
1079 SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
1080
1081 OSDMapRef try_get_map(epoch_t e);
1082 OSDMapRef get_map(epoch_t e) {
1083 OSDMapRef ret(try_get_map(e));
1084 assert(ret);
1085 return ret;
1086 }
1087 OSDMapRef add_map(OSDMap *o) {
1088 Mutex::Locker l(map_cache_lock);
1089 return _add_map(o);
1090 }
1091 OSDMapRef _add_map(OSDMap *o);
1092
1093 void add_map_bl(epoch_t e, bufferlist& bl) {
1094 Mutex::Locker l(map_cache_lock);
1095 return _add_map_bl(e, bl);
1096 }
1097 void pin_map_bl(epoch_t e, bufferlist &bl);
1098 void _add_map_bl(epoch_t e, bufferlist& bl);
1099 bool get_map_bl(epoch_t e, bufferlist& bl) {
1100 Mutex::Locker l(map_cache_lock);
1101 return _get_map_bl(e, bl);
1102 }
1103 bool _get_map_bl(epoch_t e, bufferlist& bl);
1104
1105 void add_map_inc_bl(epoch_t e, bufferlist& bl) {
1106 Mutex::Locker l(map_cache_lock);
1107 return _add_map_inc_bl(e, bl);
1108 }
1109 void pin_map_inc_bl(epoch_t e, bufferlist &bl);
1110 void _add_map_inc_bl(epoch_t e, bufferlist& bl);
1111 bool get_inc_map_bl(epoch_t e, bufferlist& bl);
1112
1113 void clear_map_bl_cache_pins(epoch_t e);
1114
1115 void need_heartbeat_peer_update();
1116
1117 void pg_stat_queue_enqueue(PG *pg);
1118 void pg_stat_queue_dequeue(PG *pg);
1119
1120 void init();
1121 void final_init();
1122 void start_shutdown();
1123 void shutdown_reserver();
1124 void shutdown();
1125
1126 private:
1127 // split
1128 Mutex in_progress_split_lock;
1129 map<spg_t, spg_t> pending_splits; // child -> parent
1130 map<spg_t, set<spg_t> > rev_pending_splits; // parent -> [children]
1131 set<spg_t> in_progress_splits; // child
1132
1133 public:
1134 void _start_split(spg_t parent, const set<spg_t> &children);
1135 void start_split(spg_t parent, const set<spg_t> &children) {
1136 Mutex::Locker l(in_progress_split_lock);
1137 return _start_split(parent, children);
1138 }
1139 void mark_split_in_progress(spg_t parent, const set<spg_t> &pgs);
1140 void complete_split(const set<spg_t> &pgs);
1141 void cancel_pending_splits_for_parent(spg_t parent);
1142 void _cancel_pending_splits_for_parent(spg_t parent);
1143 bool splitting(spg_t pgid);
1144 void expand_pg_num(OSDMapRef old_map,
1145 OSDMapRef new_map);
1146 void _maybe_split_pgid(OSDMapRef old_map,
1147 OSDMapRef new_map,
1148 spg_t pgid);
1149 void init_splits_between(spg_t pgid, OSDMapRef frommap, OSDMapRef tomap);
1150
1151 // -- stats --
1152 Mutex stat_lock;
1153 osd_stat_t osd_stat;
1154 uint32_t seq = 0;
1155
1156 void update_osd_stat(vector<int>& hb_peers);
1157 osd_stat_t get_osd_stat() {
1158 Mutex::Locker l(stat_lock);
1159 ++seq;
1160 osd_stat.up_from = up_epoch;
1161 osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
1162 return osd_stat;
1163 }
1164 uint64_t get_osd_stat_seq() {
1165 Mutex::Locker l(stat_lock);
1166 return osd_stat.seq;
1167 }
1168
1169 // -- OSD Full Status --
1170 private:
1171 friend TestOpsSocketHook;
1172 mutable Mutex full_status_lock;
1173 enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
1174 const char *get_full_state_name(s_names s) const {
1175 switch (s) {
1176 case NONE: return "none";
1177 case NEARFULL: return "nearfull";
1178 case BACKFILLFULL: return "backfillfull";
1179 case FULL: return "full";
1180 case FAILSAFE: return "failsafe";
1181 default: return "???";
1182 }
1183 }
1184 s_names get_full_state(string type) const {
1185 if (type == "none")
1186 return NONE;
1187 else if (type == "failsafe")
1188 return FAILSAFE;
1189 else if (type == "full")
1190 return FULL;
1191 else if (type == "backfillfull")
1192 return BACKFILLFULL;
1193 else if (type == "nearfull")
1194 return NEARFULL;
1195 else
1196 return INVALID;
1197 }
1198 double cur_ratio; ///< current utilization
1199 mutable int64_t injectfull = 0;
1200 s_names injectfull_state = NONE;
1201 float get_failsafe_full_ratio();
1202 void check_full_status(const osd_stat_t &stat);
1203 bool _check_full(s_names type, ostream &ss) const;
1204 public:
1205 bool check_failsafe_full(ostream &ss) const;
1206 bool check_full(ostream &ss) const;
1207 bool check_backfill_full(ostream &ss) const;
1208 bool check_nearfull(ostream &ss) const;
1209 bool is_failsafe_full() const;
1210 bool is_full() const;
1211 bool is_backfillfull() const;
1212 bool is_nearfull() const;
1213 bool need_fullness_update(); ///< osdmap state needs update
1214 void set_injectfull(s_names type, int64_t count);
1215 bool check_osdmap_full(const set<pg_shard_t> &missing_on);
1216
1217
1218 // -- epochs --
1219 private:
1220 mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch
1221 epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
1222 epoch_t up_epoch; // _most_recent_ epoch we were marked up
1223 epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
1224 public:
1225 /**
1226 * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params
1227 * can be NULL if you don't care about them.
1228 */
1229 void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
1230 epoch_t *_bind_epoch) const;
1231 /**
1232 * Set the boot, up, and bind epochs. Any NULL params will not be set.
1233 */
1234 void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
1235 const epoch_t *_bind_epoch);
1236 epoch_t get_boot_epoch() const {
1237 epoch_t ret;
1238 retrieve_epochs(&ret, NULL, NULL);
1239 return ret;
1240 }
1241 epoch_t get_up_epoch() const {
1242 epoch_t ret;
1243 retrieve_epochs(NULL, &ret, NULL);
1244 return ret;
1245 }
1246 epoch_t get_bind_epoch() const {
1247 epoch_t ret;
1248 retrieve_epochs(NULL, NULL, &ret);
1249 return ret;
1250 }
1251
1252 // -- stopping --
1253 Mutex is_stopping_lock;
1254 Cond is_stopping_cond;
1255 enum {
1256 NOT_STOPPING,
1257 PREPARING_TO_STOP,
1258 STOPPING };
1259 std::atomic_int state{NOT_STOPPING};
1260 int get_state() {
1261 return state;
1262 }
1263 void set_state(int s) {
1264 state = s;
1265 }
1266 bool is_stopping() const {
1267 return state == STOPPING;
1268 }
1269 bool is_preparing_to_stop() const {
1270 return state == PREPARING_TO_STOP;
1271 }
1272 bool prepare_to_stop();
1273 void got_stop_ack();
1274
1275
1276 #ifdef PG_DEBUG_REFS
1277 Mutex pgid_lock;
1278 map<spg_t, int> pgid_tracker;
1279 map<spg_t, PG*> live_pgs;
1280 void add_pgid(spg_t pgid, PG *pg);
1281 void remove_pgid(spg_t pgid, PG *pg);
1282 void dump_live_pgids();
1283 #endif
1284
1285 explicit OSDService(OSD *osd);
1286 ~OSDService();
1287 };
1288
1289 class OSD : public Dispatcher,
1290 public md_config_obs_t {
1291 /** OSD **/
1292 Mutex osd_lock; // global lock
1293 SafeTimer tick_timer; // safe timer (osd_lock)
1294
1295 // Tick timer for those stuff that do not need osd_lock
1296 Mutex tick_timer_lock;
1297 SafeTimer tick_timer_without_osd_lock;
1298 public:
1299 // config observer bits
1300 const char** get_tracked_conf_keys() const override;
1301 void handle_conf_change(const struct md_config_t *conf,
1302 const std::set <std::string> &changed) override;
1303 void update_log_config();
1304 void check_config();
1305
1306 protected:
1307
1308 static const double OSD_TICK_INTERVAL; // tick interval for tick_timer and tick_timer_without_osd_lock
1309
1310 AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry;
1311 AuthAuthorizeHandlerRegistry *authorize_handler_service_registry;
1312
1313 Messenger *cluster_messenger;
1314 Messenger *client_messenger;
1315 Messenger *objecter_messenger;
1316 MonClient *monc; // check the "monc helpers" list before accessing directly
1317 MgrClient mgrc;
1318 PerfCounters *logger;
1319 PerfCounters *recoverystate_perf;
1320 ObjectStore *store;
1321 #ifdef HAVE_LIBFUSE
1322 FuseStore *fuse_store = nullptr;
1323 #endif
1324 LogClient log_client;
1325 LogChannelRef clog;
1326
1327 int whoami;
1328 std::string dev_path, journal_path;
1329
1330 bool store_is_rotational = true;
1331
1332 ZTracer::Endpoint trace_endpoint;
1333 void create_logger();
1334 void create_recoverystate_perf();
1335 void tick();
1336 void tick_without_osd_lock();
1337 void _dispatch(Message *m);
1338 void dispatch_op(OpRequestRef op);
1339
1340 void check_osdmap_features(ObjectStore *store);
1341
1342 // asok
1343 friend class OSDSocketHook;
1344 class OSDSocketHook *asok_hook;
1345 bool asok_command(string admin_command, cmdmap_t& cmdmap, string format, ostream& ss);
1346
1347 public:
1348 ClassHandler *class_handler = nullptr;
1349 int get_nodeid() { return whoami; }
1350
1351 static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
1352 char foo[20];
1353 snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
1354 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1355 }
1356 static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
1357 char foo[22];
1358 snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch);
1359 return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1360 }
1361
1362 static ghobject_t make_snapmapper_oid() {
1363 return ghobject_t(hobject_t(
1364 sobject_t(
1365 object_t("snapmapper"),
1366 0)));
1367 }
1368
1369 static ghobject_t make_pg_log_oid(spg_t pg) {
1370 stringstream ss;
1371 ss << "pglog_" << pg;
1372 string s;
1373 getline(ss, s);
1374 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1375 }
1376
1377 static ghobject_t make_pg_biginfo_oid(spg_t pg) {
1378 stringstream ss;
1379 ss << "pginfo_" << pg;
1380 string s;
1381 getline(ss, s);
1382 return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1383 }
1384 static ghobject_t make_infos_oid() {
1385 hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
1386 return ghobject_t(oid);
1387 }
1388 static void recursive_remove_collection(CephContext* cct,
1389 ObjectStore *store,
1390 spg_t pgid,
1391 coll_t tmp);
1392
1393 /**
1394 * get_osd_initial_compat_set()
1395 *
1396 * Get the initial feature set for this OSD. Features
1397 * here are automatically upgraded.
1398 *
1399 * Return value: Initial osd CompatSet
1400 */
1401 static CompatSet get_osd_initial_compat_set();
1402
1403 /**
1404 * get_osd_compat_set()
1405 *
1406 * Get all features supported by this OSD
1407 *
1408 * Return value: CompatSet of all supported features
1409 */
1410 static CompatSet get_osd_compat_set();
1411
1412
1413 private:
1414 class C_Tick;
1415 class C_Tick_WithoutOSDLock;
1416
1417 // -- superblock --
1418 OSDSuperblock superblock;
1419
1420 void write_superblock();
1421 void write_superblock(ObjectStore::Transaction& t);
1422 int read_superblock();
1423
1424 void clear_temp_objects();
1425
1426 CompatSet osd_compat;
1427
1428 // -- state --
1429 public:
1430 typedef enum {
1431 STATE_INITIALIZING = 1,
1432 STATE_PREBOOT,
1433 STATE_BOOTING,
1434 STATE_ACTIVE,
1435 STATE_STOPPING,
1436 STATE_WAITING_FOR_HEALTHY
1437 } osd_state_t;
1438
1439 static const char *get_state_name(int s) {
1440 switch (s) {
1441 case STATE_INITIALIZING: return "initializing";
1442 case STATE_PREBOOT: return "preboot";
1443 case STATE_BOOTING: return "booting";
1444 case STATE_ACTIVE: return "active";
1445 case STATE_STOPPING: return "stopping";
1446 case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy";
1447 default: return "???";
1448 }
1449 }
1450
1451 private:
1452 std::atomic_int state{STATE_INITIALIZING};
1453
1454 public:
1455 int get_state() const {
1456 return state;
1457 }
1458 void set_state(int s) {
1459 state = s;
1460 }
1461 bool is_initializing() const {
1462 return state == STATE_INITIALIZING;
1463 }
1464 bool is_preboot() const {
1465 return state == STATE_PREBOOT;
1466 }
1467 bool is_booting() const {
1468 return state == STATE_BOOTING;
1469 }
1470 bool is_active() const {
1471 return state == STATE_ACTIVE;
1472 }
1473 bool is_stopping() const {
1474 return state == STATE_STOPPING;
1475 }
1476 bool is_waiting_for_healthy() const {
1477 return state == STATE_WAITING_FOR_HEALTHY;
1478 }
1479
1480 private:
1481
1482 ThreadPool peering_tp;
1483 ShardedThreadPool osd_op_tp;
1484 ThreadPool disk_tp;
1485 ThreadPool command_tp;
1486
1487 void set_disk_tp_priority();
1488 void get_latest_osdmap();
1489
1490 // -- sessions --
1491 private:
1492 void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
1493 void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
1494
1495 Mutex session_waiting_lock;
1496 set<Session*> session_waiting_for_map;
1497
1498 /// Caller assumes refs for included Sessions
1499 void get_sessions_waiting_for_map(set<Session*> *out) {
1500 Mutex::Locker l(session_waiting_lock);
1501 out->swap(session_waiting_for_map);
1502 }
1503 void register_session_waiting_on_map(Session *session) {
1504 Mutex::Locker l(session_waiting_lock);
1505 if (session_waiting_for_map.insert(session).second) {
1506 session->get();
1507 }
1508 }
1509 void clear_session_waiting_on_map(Session *session) {
1510 Mutex::Locker l(session_waiting_lock);
1511 set<Session*>::iterator i = session_waiting_for_map.find(session);
1512 if (i != session_waiting_for_map.end()) {
1513 (*i)->put();
1514 session_waiting_for_map.erase(i);
1515 }
1516 }
1517 void dispatch_sessions_waiting_on_map() {
1518 set<Session*> sessions_to_check;
1519 get_sessions_waiting_for_map(&sessions_to_check);
1520 for (set<Session*>::iterator i = sessions_to_check.begin();
1521 i != sessions_to_check.end();
1522 sessions_to_check.erase(i++)) {
1523 (*i)->session_dispatch_lock.Lock();
1524 dispatch_session_waiting(*i, osdmap);
1525 (*i)->session_dispatch_lock.Unlock();
1526 (*i)->put();
1527 }
1528 }
1529 void session_handle_reset(Session *session) {
1530 Mutex::Locker l(session->session_dispatch_lock);
1531 clear_session_waiting_on_map(session);
1532
1533 session->clear_backoffs();
1534
1535 /* Messages have connection refs, we need to clear the
1536 * connection->session->message->connection
1537 * cycles which result.
1538 * Bug #12338
1539 */
1540 session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
1541 }
1542
1543 private:
1544 /**
1545 * @defgroup monc helpers
1546 * @{
1547 * Right now we only have the one
1548 */
1549
1550 /**
1551 * Ask the Monitors for a sequence of OSDMaps.
1552 *
1553 * @param epoch The epoch to start with when replying
1554 * @param force_request True if this request forces a new subscription to
1555 * the monitors; false if an outstanding request that encompasses it is
1556 * sufficient.
1557 */
1558 void osdmap_subscribe(version_t epoch, bool force_request);
1559 /** @} monc helpers */
1560
1561 // -- heartbeat --
1562 /// information about a heartbeat peer
1563 struct HeartbeatInfo {
1564 int peer; ///< peer
1565 ConnectionRef con_front; ///< peer connection (front)
1566 ConnectionRef con_back; ///< peer connection (back)
1567 utime_t first_tx; ///< time we sent our first ping request
1568 utime_t last_tx; ///< last time we sent a ping request
1569 utime_t last_rx_front; ///< last time we got a ping reply on the front side
1570 utime_t last_rx_back; ///< last time we got a ping reply on the back side
1571 epoch_t epoch; ///< most recent epoch we wanted this peer
1572
1573 bool is_unhealthy(utime_t cutoff) const {
1574 return
1575 ! ((last_rx_front > cutoff ||
1576 (last_rx_front == utime_t() && (last_tx == utime_t() ||
1577 first_tx > cutoff))) &&
1578 (last_rx_back > cutoff ||
1579 (last_rx_back == utime_t() && (last_tx == utime_t() ||
1580 first_tx > cutoff))));
1581 }
1582 bool is_healthy(utime_t cutoff) const {
1583 return last_rx_front > cutoff && last_rx_back > cutoff;
1584 }
1585
1586 };
1587 /// state attached to outgoing heartbeat connections
1588 struct HeartbeatSession : public RefCountedObject {
1589 int peer;
1590 explicit HeartbeatSession(int p) : peer(p) {}
1591 };
1592 Mutex heartbeat_lock;
1593 map<int, int> debug_heartbeat_drops_remaining;
1594 Cond heartbeat_cond;
1595 bool heartbeat_stop;
1596 std::atomic_bool heartbeat_need_update;
1597 map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
1598 utime_t last_mon_heartbeat;
1599 Messenger *hb_front_client_messenger;
1600 Messenger *hb_back_client_messenger;
1601 Messenger *hb_front_server_messenger;
1602 Messenger *hb_back_server_messenger;
1603 utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
1604 double daily_loadavg;
1605
1606 void _add_heartbeat_peer(int p);
1607 void _remove_heartbeat_peer(int p);
1608 bool heartbeat_reset(Connection *con);
1609 void maybe_update_heartbeat_peers();
1610 void reset_heartbeat_peers();
1611 bool heartbeat_peers_need_update() {
1612 return heartbeat_need_update.load();
1613 }
1614 void heartbeat_set_peers_need_update() {
1615 heartbeat_need_update.store(true);
1616 }
1617 void heartbeat_clear_peers_need_update() {
1618 heartbeat_need_update.store(false);
1619 }
1620 void heartbeat();
1621 void heartbeat_check();
1622 void heartbeat_entry();
1623 void need_heartbeat_peer_update();
1624
1625 void heartbeat_kick() {
1626 Mutex::Locker l(heartbeat_lock);
1627 heartbeat_cond.Signal();
1628 }
1629
1630 struct T_Heartbeat : public Thread {
1631 OSD *osd;
1632 explicit T_Heartbeat(OSD *o) : osd(o) {}
1633 void *entry() override {
1634 osd->heartbeat_entry();
1635 return 0;
1636 }
1637 } heartbeat_thread;
1638
1639 public:
1640 bool heartbeat_dispatch(Message *m);
1641
1642 struct HeartbeatDispatcher : public Dispatcher {
1643 OSD *osd;
1644 explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
1645
1646 bool ms_can_fast_dispatch_any() const override { return true; }
1647 bool ms_can_fast_dispatch(const Message *m) const override {
1648 switch (m->get_type()) {
1649 case CEPH_MSG_PING:
1650 case MSG_OSD_PING:
1651 return true;
1652 default:
1653 return false;
1654 }
1655 }
1656 void ms_fast_dispatch(Message *m) override {
1657 osd->heartbeat_dispatch(m);
1658 }
1659 bool ms_dispatch(Message *m) override {
1660 return osd->heartbeat_dispatch(m);
1661 }
1662 bool ms_handle_reset(Connection *con) override {
1663 return osd->heartbeat_reset(con);
1664 }
1665 void ms_handle_remote_reset(Connection *con) override {}
1666 bool ms_handle_refused(Connection *con) override {
1667 return osd->ms_handle_refused(con);
1668 }
1669 bool ms_verify_authorizer(Connection *con, int peer_type,
1670 int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
1671 bool& isvalid, CryptoKey& session_key) override {
1672 isvalid = true;
1673 return true;
1674 }
1675 } heartbeat_dispatcher;
1676
1677 private:
1678 // -- waiters --
1679 list<OpRequestRef> finished;
1680
1681 void take_waiters(list<OpRequestRef>& ls) {
1682 assert(osd_lock.is_locked());
1683 finished.splice(finished.end(), ls);
1684 }
1685 void do_waiters();
1686
1687 // -- op tracking --
1688 OpTracker op_tracker;
1689 void check_ops_in_flight();
1690 void test_ops(std::string command, std::string args, ostream& ss);
1691 friend class TestOpsSocketHook;
1692 TestOpsSocketHook *test_ops_hook;
1693 friend struct C_CompleteSplits;
1694 friend struct C_OpenPGs;
1695
1696 // -- op queue --
1697 enum io_queue {
1698 prioritized,
1699 weightedpriority
1700 };
1701 const io_queue op_queue;
1702 const unsigned int op_prio_cutoff;
1703
1704 /*
1705 * The ordered op delivery chain is:
1706 *
1707 * fast dispatch -> pqueue back
1708 * pqueue front <-> to_process back
1709 * to_process front -> RunVis(item)
1710 * <- queue_front()
1711 *
1712 * The pqueue is per-shard, and to_process is per pg_slot. Items can be
1713 * pushed back up into to_process and/or pqueue while order is preserved.
1714 *
1715 * Multiple worker threads can operate on each shard.
1716 *
1717 * Under normal circumstances, num_running == to_proces.size(). There are
1718 * two times when that is not true: (1) when waiting_for_pg == true and
1719 * to_process is accumulating requests that are waiting for the pg to be
1720 * instantiated; in that case they will all get requeued together by
1721 * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
1722 * and already requeued the items.
1723 */
1724 friend class PGQueueable;
1725 class ShardedOpWQ
1726 : public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>
1727 {
1728 struct ShardData {
1729 Mutex sdata_lock;
1730 Cond sdata_cond;
1731
1732 Mutex sdata_op_ordering_lock; ///< protects all members below
1733
1734 OSDMapRef waiting_for_pg_osdmap;
1735 struct pg_slot {
1736 PGRef pg; ///< cached pg reference [optional]
1737 list<PGQueueable> to_process; ///< order items for this slot
1738 int num_running = 0; ///< _process threads doing pg lookup/lock
1739
1740 /// true if pg does/did not exist. if so all new items go directly to
1741 /// to_process. cleared by prune_pg_waiters.
1742 bool waiting_for_pg = false;
1743
1744 /// incremented by wake_pg_waiters; indicates racing _process threads
1745 /// should bail out (their op has been requeued)
1746 uint64_t requeue_seq = 0;
1747 };
1748
1749 /// map of slots for each spg_t. maintains ordering of items dequeued
1750 /// from pqueue while _process thread drops shard lock to acquire the
1751 /// pg lock. slots are removed only by prune_pg_waiters.
1752 unordered_map<spg_t,pg_slot> pg_slots;
1753
1754 /// priority queue
1755 std::unique_ptr<OpQueue< pair<spg_t, PGQueueable>, entity_inst_t>> pqueue;
1756
1757 void _enqueue_front(pair<spg_t, PGQueueable> item, unsigned cutoff) {
1758 unsigned priority = item.second.get_priority();
1759 unsigned cost = item.second.get_cost();
1760 if (priority >= cutoff)
1761 pqueue->enqueue_strict_front(
1762 item.second.get_owner(),
1763 priority, item);
1764 else
1765 pqueue->enqueue_front(
1766 item.second.get_owner(),
1767 priority, cost, item);
1768 }
1769
1770 ShardData(
1771 string lock_name, string ordering_lock,
1772 uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
1773 io_queue opqueue)
1774 : sdata_lock(lock_name.c_str(), false, true, false, cct),
1775 sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
1776 false, cct) {
1777 if (opqueue == weightedpriority) {
1778 pqueue = std::unique_ptr
1779 <WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
1780 new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
1781 max_tok_per_prio, min_cost));
1782 } else if (opqueue == prioritized) {
1783 pqueue = std::unique_ptr
1784 <PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
1785 new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
1786 max_tok_per_prio, min_cost));
1787 }
1788 }
1789 };
1790
1791 vector<ShardData*> shard_list;
1792 OSD *osd;
1793 uint32_t num_shards;
1794
1795 public:
1796 ShardedOpWQ(uint32_t pnum_shards,
1797 OSD *o,
1798 time_t ti,
1799 time_t si,
1800 ShardedThreadPool* tp)
1801 : ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>(ti, si, tp),
1802 osd(o),
1803 num_shards(pnum_shards) {
1804 for (uint32_t i = 0; i < num_shards; i++) {
1805 char lock_name[32] = {0};
1806 snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
1807 char order_lock[32] = {0};
1808 snprintf(order_lock, sizeof(order_lock), "%s.%d",
1809 "OSD:ShardedOpWQ:order:", i);
1810 ShardData* one_shard = new ShardData(
1811 lock_name, order_lock,
1812 osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
1813 osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue);
1814 shard_list.push_back(one_shard);
1815 }
1816 }
1817 ~ShardedOpWQ() override {
1818 while (!shard_list.empty()) {
1819 delete shard_list.back();
1820 shard_list.pop_back();
1821 }
1822 }
1823
1824 /// wake any pg waiters after a PG is created/instantiated
1825 void wake_pg_waiters(spg_t pgid);
1826
1827 /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here
1828 void prune_pg_waiters(OSDMapRef osdmap, int whoami);
1829
1830 /// clear cached PGRef on pg deletion
1831 void clear_pg_pointer(spg_t pgid);
1832
1833 /// clear pg_slots on shutdown
1834 void clear_pg_slots();
1835
1836 /// try to do some work
1837 void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
1838
1839 /// enqueue a new item
1840 void _enqueue(pair <spg_t, PGQueueable> item) override;
1841
1842 /// requeue an old item (at the front of the line)
1843 void _enqueue_front(pair <spg_t, PGQueueable> item) override;
1844
1845 void return_waiting_threads() override {
1846 for(uint32_t i = 0; i < num_shards; i++) {
1847 ShardData* sdata = shard_list[i];
1848 assert (NULL != sdata);
1849 sdata->sdata_lock.Lock();
1850 sdata->sdata_cond.Signal();
1851 sdata->sdata_lock.Unlock();
1852 }
1853 }
1854
1855 void dump(Formatter *f) {
1856 for(uint32_t i = 0; i < num_shards; i++) {
1857 ShardData* sdata = shard_list[i];
1858 char lock_name[32] = {0};
1859 snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i);
1860 assert (NULL != sdata);
1861 sdata->sdata_op_ordering_lock.Lock();
1862 f->open_object_section(lock_name);
1863 sdata->pqueue->dump(f);
1864 f->close_section();
1865 sdata->sdata_op_ordering_lock.Unlock();
1866 }
1867 }
1868
1869 /// Must be called on ops queued back to front
1870 struct Pred {
1871 spg_t pgid;
1872 list<OpRequestRef> *out_ops;
1873 uint64_t reserved_pushes_to_free;
1874 Pred(spg_t pg, list<OpRequestRef> *out_ops = 0)
1875 : pgid(pg), out_ops(out_ops), reserved_pushes_to_free(0) {}
1876 void accumulate(const PGQueueable &op) {
1877 reserved_pushes_to_free += op.get_reserved_pushes();
1878 if (out_ops) {
1879 boost::optional<OpRequestRef> mop = op.maybe_get_op();
1880 if (mop)
1881 out_ops->push_front(*mop);
1882 }
1883 }
1884 bool operator()(const pair<spg_t, PGQueueable> &op) {
1885 if (op.first == pgid) {
1886 accumulate(op.second);
1887 return true;
1888 } else {
1889 return false;
1890 }
1891 }
1892 uint64_t get_reserved_pushes_to_free() const {
1893 return reserved_pushes_to_free;
1894 }
1895 };
1896
1897 bool is_shard_empty(uint32_t thread_index) override {
1898 uint32_t shard_index = thread_index % num_shards;
1899 ShardData* sdata = shard_list[shard_index];
1900 assert(NULL != sdata);
1901 Mutex::Locker l(sdata->sdata_op_ordering_lock);
1902 return sdata->pqueue->empty();
1903 }
1904 } op_shardedwq;
1905
1906
1907 void enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch);
1908 void dequeue_op(
1909 PGRef pg, OpRequestRef op,
1910 ThreadPool::TPHandle &handle);
1911
1912 // -- peering queue --
1913 struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
1914 list<PG*> peering_queue;
1915 OSD *osd;
1916 set<PG*> in_use;
1917 PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
1918 : ThreadPool::BatchWorkQueue<PG>(
1919 "OSD::PeeringWQ", ti, si, tp), osd(o) {}
1920
1921 void _dequeue(PG *pg) override {
1922 for (list<PG*>::iterator i = peering_queue.begin();
1923 i != peering_queue.end();
1924 ) {
1925 if (*i == pg) {
1926 peering_queue.erase(i++);
1927 pg->put("PeeringWQ");
1928 } else {
1929 ++i;
1930 }
1931 }
1932 }
1933 bool _enqueue(PG *pg) override {
1934 pg->get("PeeringWQ");
1935 peering_queue.push_back(pg);
1936 return true;
1937 }
1938 bool _empty() override {
1939 return peering_queue.empty();
1940 }
1941 void _dequeue(list<PG*> *out) override;
1942 void _process(
1943 const list<PG *> &pgs,
1944 ThreadPool::TPHandle &handle) override {
1945 assert(!pgs.empty());
1946 osd->process_peering_events(pgs, handle);
1947 for (list<PG *>::const_iterator i = pgs.begin();
1948 i != pgs.end();
1949 ++i) {
1950 (*i)->put("PeeringWQ");
1951 }
1952 }
1953 void _process_finish(const list<PG *> &pgs) override {
1954 for (list<PG*>::const_iterator i = pgs.begin();
1955 i != pgs.end();
1956 ++i) {
1957 in_use.erase(*i);
1958 }
1959 }
1960 void _clear() override {
1961 assert(peering_queue.empty());
1962 }
1963 } peering_wq;
1964
1965 void process_peering_events(
1966 const list<PG*> &pg,
1967 ThreadPool::TPHandle &handle);
1968
1969 friend class PG;
1970 friend class PrimaryLogPG;
1971
1972
1973 protected:
1974
1975 // -- osd map --
1976 OSDMapRef osdmap;
1977 OSDMapRef get_osdmap() {
1978 return osdmap;
1979 }
1980 epoch_t get_osdmap_epoch() {
1981 return osdmap ? osdmap->get_epoch() : 0;
1982 }
1983
1984 utime_t had_map_since;
1985 RWLock map_lock;
1986 list<OpRequestRef> waiting_for_osdmap;
1987 deque<utime_t> osd_markdown_log;
1988
1989 friend struct send_map_on_destruct;
1990
1991 void wait_for_new_map(OpRequestRef op);
1992 void handle_osd_map(class MOSDMap *m);
1993 void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
1994 void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
1995 void note_down_osd(int osd);
1996 void note_up_osd(int osd);
1997 friend class C_OnMapCommit;
1998
1999 bool advance_pg(
2000 epoch_t advance_to, PG *pg,
2001 ThreadPool::TPHandle &handle,
2002 PG::RecoveryCtx *rctx,
2003 set<PGRef> *split_pgs
2004 );
2005 void consume_map();
2006 void activate_map();
2007
2008 // osd map cache (past osd maps)
2009 OSDMapRef get_map(epoch_t e) {
2010 return service.get_map(e);
2011 }
2012 OSDMapRef add_map(OSDMap *o) {
2013 return service.add_map(o);
2014 }
2015 void add_map_bl(epoch_t e, bufferlist& bl) {
2016 return service.add_map_bl(e, bl);
2017 }
2018 void pin_map_bl(epoch_t e, bufferlist &bl) {
2019 return service.pin_map_bl(e, bl);
2020 }
2021 bool get_map_bl(epoch_t e, bufferlist& bl) {
2022 return service.get_map_bl(e, bl);
2023 }
2024 void add_map_inc_bl(epoch_t e, bufferlist& bl) {
2025 return service.add_map_inc_bl(e, bl);
2026 }
2027 void pin_map_inc_bl(epoch_t e, bufferlist &bl) {
2028 return service.pin_map_inc_bl(e, bl);
2029 }
2030
2031 protected:
2032 // -- placement groups --
2033 RWLock pg_map_lock; // this lock orders *above* individual PG _locks
2034 ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock
2035
2036 map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
2037 PGRecoveryStats pg_recovery_stats;
2038
2039 PGPool _get_pool(int id, OSDMapRef createmap);
2040
2041 PG *_lookup_lock_pg_with_map_lock_held(spg_t pgid);
2042 PG *_lookup_lock_pg(spg_t pgid);
2043
2044 public:
2045 PG *lookup_lock_pg(spg_t pgid);
2046
2047 protected:
2048 PG *_open_lock_pg(OSDMapRef createmap,
2049 spg_t pg, bool no_lockdep_check=false);
2050 enum res_result {
2051 RES_PARENT, // resurrected a parent
2052 RES_SELF, // resurrected self
2053 RES_NONE // nothing relevant deleting
2054 };
2055 res_result _try_resurrect_pg(
2056 OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state);
2057
2058 PG *_create_lock_pg(
2059 OSDMapRef createmap,
2060 spg_t pgid,
2061 bool hold_map_lock,
2062 bool backfill,
2063 int role,
2064 vector<int>& up, int up_primary,
2065 vector<int>& acting, int acting_primary,
2066 pg_history_t history,
2067 const PastIntervals& pi,
2068 ObjectStore::Transaction& t);
2069
2070 PG* _make_pg(OSDMapRef createmap, spg_t pgid);
2071 void add_newly_split_pg(PG *pg,
2072 PG::RecoveryCtx *rctx);
2073
2074 int handle_pg_peering_evt(
2075 spg_t pgid,
2076 const pg_history_t& orig_history,
2077 const PastIntervals& pi,
2078 epoch_t epoch,
2079 PG::CephPeeringEvtRef evt);
2080
2081 void load_pgs();
2082 void build_past_intervals_parallel();
2083
2084 /// build initial pg history and intervals on create
2085 void build_initial_pg_history(
2086 spg_t pgid,
2087 epoch_t created,
2088 utime_t created_stamp,
2089 pg_history_t *h,
2090 PastIntervals *pi);
2091
2092 /// project pg history from from to now
2093 bool project_pg_history(
2094 spg_t pgid, pg_history_t& h, epoch_t from,
2095 const vector<int>& lastup,
2096 int lastupprimary,
2097 const vector<int>& lastacting,
2098 int lastactingprimary
2099 ); ///< @return false if there was a map gap between from and now
2100
2101 // this must be called with pg->lock held on any pg addition to pg_map
2102 void wake_pg_waiters(PGRef pg) {
2103 assert(pg->is_locked());
2104 op_shardedwq.wake_pg_waiters(pg->info.pgid);
2105 }
2106 epoch_t last_pg_create_epoch;
2107
2108 void handle_pg_create(OpRequestRef op);
2109
2110 void split_pgs(
2111 PG *parent,
2112 const set<spg_t> &childpgids, set<PGRef> *out_pgs,
2113 OSDMapRef curmap,
2114 OSDMapRef nextmap,
2115 PG::RecoveryCtx *rctx);
2116
2117 // == monitor interaction ==
2118 Mutex mon_report_lock;
2119 utime_t last_mon_report;
2120 utime_t last_pg_stats_sent;
2121
2122 /* if our monitor dies, we want to notice it and reconnect.
2123 * So we keep track of when it last acked our stat updates,
2124 * and if too much time passes (and we've been sending
2125 * more updates) then we can call it dead and reconnect
2126 * elsewhere.
2127 */
2128 utime_t last_pg_stats_ack;
2129 float stats_ack_timeout;
2130 set<uint64_t> outstanding_pg_stats; // how many stat updates haven't been acked yet
2131
2132 // -- boot --
2133 void start_boot();
2134 void _got_mon_epochs(epoch_t oldest, epoch_t newest);
2135 void _preboot(epoch_t oldest, epoch_t newest);
2136 void _send_boot();
2137 void _collect_metadata(map<string,string> *pmeta);
2138
2139 void start_waiting_for_healthy();
2140 bool _is_healthy();
2141
2142 void send_full_update();
2143
2144 friend struct C_OSD_GetVersion;
2145
2146 // -- alive --
2147 epoch_t up_thru_wanted;
2148
2149 void queue_want_up_thru(epoch_t want);
2150 void send_alive();
2151
2152 // -- full map requests --
2153 epoch_t requested_full_first, requested_full_last;
2154
2155 void request_full_map(epoch_t first, epoch_t last);
2156 void rerequest_full_maps() {
2157 epoch_t first = requested_full_first;
2158 epoch_t last = requested_full_last;
2159 requested_full_first = 0;
2160 requested_full_last = 0;
2161 request_full_map(first, last);
2162 }
2163 void got_full_map(epoch_t e);
2164
2165 // -- failures --
2166 map<int,utime_t> failure_queue;
2167 map<int,pair<utime_t,entity_inst_t> > failure_pending;
2168
2169 void requeue_failures();
2170 void send_failures();
2171 void send_still_alive(epoch_t epoch, const entity_inst_t &i);
2172
2173 // -- pg stats --
2174 Mutex pg_stat_queue_lock;
2175 Cond pg_stat_queue_cond;
2176 xlist<PG*> pg_stat_queue;
2177 bool osd_stat_updated;
2178 uint64_t pg_stat_tid, pg_stat_tid_flushed;
2179
2180 void send_pg_stats(const utime_t &now);
2181 void handle_pg_stats_ack(class MPGStatsAck *ack);
2182 void flush_pg_stats();
2183
2184 ceph::coarse_mono_clock::time_point last_sent_beacon;
2185 Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
2186 epoch_t min_last_epoch_clean = 0;
2187 // which pgs were scanned for min_lec
2188 std::vector<pg_t> min_last_epoch_clean_pgs;
2189 void send_beacon(const ceph::coarse_mono_clock::time_point& now);
2190
2191 void pg_stat_queue_enqueue(PG *pg) {
2192 pg_stat_queue_lock.Lock();
2193 if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) {
2194 pg->get("pg_stat_queue");
2195 pg_stat_queue.push_back(&pg->stat_queue_item);
2196 }
2197 osd_stat_updated = true;
2198 pg_stat_queue_lock.Unlock();
2199 }
2200 void pg_stat_queue_dequeue(PG *pg) {
2201 pg_stat_queue_lock.Lock();
2202 if (pg->stat_queue_item.remove_myself())
2203 pg->put("pg_stat_queue");
2204 pg_stat_queue_lock.Unlock();
2205 }
2206 void clear_pg_stat_queue() {
2207 pg_stat_queue_lock.Lock();
2208 while (!pg_stat_queue.empty()) {
2209 PG *pg = pg_stat_queue.front();
2210 pg_stat_queue.pop_front();
2211 pg->put("pg_stat_queue");
2212 }
2213 pg_stat_queue_lock.Unlock();
2214 }
2215
2216 ceph_tid_t get_tid() {
2217 return service.get_tid();
2218 }
2219
2220 // -- generic pg peering --
2221 PG::RecoveryCtx create_context();
2222 void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
2223 ThreadPool::TPHandle *handle = NULL);
2224 void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
2225 ThreadPool::TPHandle *handle = NULL);
2226 void do_notifies(map<int,
2227 vector<pair<pg_notify_t, PastIntervals> > >&
2228 notify_list,
2229 OSDMapRef map);
2230 void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
2231 OSDMapRef map);
2232 void do_infos(map<int,
2233 vector<pair<pg_notify_t, PastIntervals> > >& info_map,
2234 OSDMapRef map);
2235
2236 bool require_mon_peer(const Message *m);
2237 bool require_mon_or_mgr_peer(const Message *m);
2238 bool require_osd_peer(const Message *m);
2239 /***
2240 * Verifies that we were alive in the given epoch, and that
2241 * still are.
2242 */
2243 bool require_self_aliveness(const Message *m, epoch_t alive_since);
2244 /**
2245 * Verifies that the OSD who sent the given op has the same
2246 * address as in the given map.
2247 * @pre op was sent by an OSD using the cluster messenger
2248 */
2249 bool require_same_peer_instance(const Message *m, OSDMapRef& map,
2250 bool is_fast_dispatch);
2251
2252 bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
2253 bool is_fast_dispatch);
2254
2255 void handle_pg_query(OpRequestRef op);
2256 void handle_pg_notify(OpRequestRef op);
2257 void handle_pg_log(OpRequestRef op);
2258 void handle_pg_info(OpRequestRef op);
2259 void handle_pg_trim(OpRequestRef op);
2260
2261 void handle_pg_backfill_reserve(OpRequestRef op);
2262 void handle_pg_recovery_reserve(OpRequestRef op);
2263
2264 void handle_pg_remove(OpRequestRef op);
2265 void _remove_pg(PG *pg);
2266
2267 // -- commands --
2268 struct Command {
2269 vector<string> cmd;
2270 ceph_tid_t tid;
2271 bufferlist indata;
2272 ConnectionRef con;
2273
2274 Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co)
2275 : cmd(c), tid(t), indata(bl), con(co) {}
2276 };
2277 list<Command*> command_queue;
2278 struct CommandWQ : public ThreadPool::WorkQueue<Command> {
2279 OSD *osd;
2280 CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
2281 : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {}
2282
2283 bool _empty() override {
2284 return osd->command_queue.empty();
2285 }
2286 bool _enqueue(Command *c) override {
2287 osd->command_queue.push_back(c);
2288 return true;
2289 }
2290 void _dequeue(Command *pg) override {
2291 ceph_abort();
2292 }
2293 Command *_dequeue() override {
2294 if (osd->command_queue.empty())
2295 return NULL;
2296 Command *c = osd->command_queue.front();
2297 osd->command_queue.pop_front();
2298 return c;
2299 }
2300 void _process(Command *c, ThreadPool::TPHandle &) override {
2301 osd->osd_lock.Lock();
2302 if (osd->is_stopping()) {
2303 osd->osd_lock.Unlock();
2304 delete c;
2305 return;
2306 }
2307 osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
2308 osd->osd_lock.Unlock();
2309 delete c;
2310 }
2311 void _clear() override {
2312 while (!osd->command_queue.empty()) {
2313 Command *c = osd->command_queue.front();
2314 osd->command_queue.pop_front();
2315 delete c;
2316 }
2317 }
2318 } command_wq;
2319
2320 void handle_command(class MMonCommand *m);
2321 void handle_command(class MCommand *m);
2322 void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
2323
2324 // -- pg recovery --
2325 void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
2326 ThreadPool::TPHandle &handle);
2327
2328
2329 // -- scrubbing --
2330 void sched_scrub();
2331 bool scrub_random_backoff();
2332 bool scrub_load_below_threshold();
2333 bool scrub_time_permit(utime_t now);
2334
2335 // -- removing --
2336 struct RemoveWQ :
2337 public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
2338 CephContext* cct;
2339 ObjectStore *&store;
2340 list<pair<PGRef, DeletingStateRef> > remove_queue;
2341 RemoveWQ(CephContext* cct, ObjectStore *&o, time_t ti, time_t si,
2342 ThreadPool *tp)
2343 : ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> >(
2344 "OSD::RemoveWQ", ti, si, tp), cct(cct), store(o) {}
2345
2346 bool _empty() override {
2347 return remove_queue.empty();
2348 }
2349 void _enqueue(pair<PGRef, DeletingStateRef> item) override {
2350 remove_queue.push_back(item);
2351 }
2352 void _enqueue_front(pair<PGRef, DeletingStateRef> item) override {
2353 remove_queue.push_front(item);
2354 }
2355 bool _dequeue(pair<PGRef, DeletingStateRef> item) {
2356 ceph_abort();
2357 }
2358 pair<PGRef, DeletingStateRef> _dequeue() override {
2359 assert(!remove_queue.empty());
2360 pair<PGRef, DeletingStateRef> item = remove_queue.front();
2361 remove_queue.pop_front();
2362 return item;
2363 }
2364 void _process(pair<PGRef, DeletingStateRef>,
2365 ThreadPool::TPHandle &) override;
2366 void _clear() override {
2367 remove_queue.clear();
2368 }
2369 } remove_wq;
2370
2371 private:
2372 bool ms_can_fast_dispatch_any() const override { return true; }
2373 bool ms_can_fast_dispatch(const Message *m) const override {
2374 switch (m->get_type()) {
2375 case CEPH_MSG_OSD_OP:
2376 case CEPH_MSG_OSD_BACKOFF:
2377 case MSG_OSD_SUBOP:
2378 case MSG_OSD_REPOP:
2379 case MSG_OSD_SUBOPREPLY:
2380 case MSG_OSD_REPOPREPLY:
2381 case MSG_OSD_PG_PUSH:
2382 case MSG_OSD_PG_PULL:
2383 case MSG_OSD_PG_PUSH_REPLY:
2384 case MSG_OSD_PG_SCAN:
2385 case MSG_OSD_PG_BACKFILL:
2386 case MSG_OSD_PG_BACKFILL_REMOVE:
2387 case MSG_OSD_EC_WRITE:
2388 case MSG_OSD_EC_WRITE_REPLY:
2389 case MSG_OSD_EC_READ:
2390 case MSG_OSD_EC_READ_REPLY:
2391 case MSG_OSD_SCRUB_RESERVE:
2392 case MSG_OSD_REP_SCRUB:
2393 case MSG_OSD_REP_SCRUBMAP:
2394 case MSG_OSD_PG_UPDATE_LOG_MISSING:
2395 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
2396 return true;
2397 default:
2398 return false;
2399 }
2400 }
2401 void ms_fast_dispatch(Message *m) override;
2402 void ms_fast_preprocess(Message *m) override;
2403 bool ms_dispatch(Message *m) override;
2404 bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) override;
2405 bool ms_verify_authorizer(Connection *con, int peer_type,
2406 int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
2407 bool& isvalid, CryptoKey& session_key) override;
2408 void ms_handle_connect(Connection *con) override;
2409 void ms_handle_fast_connect(Connection *con) override;
2410 void ms_handle_fast_accept(Connection *con) override;
2411 bool ms_handle_reset(Connection *con) override;
2412 void ms_handle_remote_reset(Connection *con) override {}
2413 bool ms_handle_refused(Connection *con) override;
2414
2415 io_queue get_io_queue() const {
2416 if (cct->_conf->osd_op_queue == "debug_random") {
2417 srand(time(NULL));
2418 return (rand() % 2 < 1) ? prioritized : weightedpriority;
2419 } else if (cct->_conf->osd_op_queue == "wpq") {
2420 return weightedpriority;
2421 } else {
2422 return prioritized;
2423 }
2424 }
2425
2426 unsigned int get_io_prio_cut() const {
2427 if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
2428 srand(time(NULL));
2429 return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
2430 } else if (cct->_conf->osd_op_queue_cut_off == "low") {
2431 return CEPH_MSG_PRIO_LOW;
2432 } else {
2433 return CEPH_MSG_PRIO_HIGH;
2434 }
2435 }
2436
2437 public:
2438 /* internal and external can point to the same messenger, they will still
2439 * be cleaned up properly*/
2440 OSD(CephContext *cct_,
2441 ObjectStore *store_,
2442 int id,
2443 Messenger *internal,
2444 Messenger *external,
2445 Messenger *hb_front_client,
2446 Messenger *hb_back_client,
2447 Messenger *hb_front_server,
2448 Messenger *hb_back_server,
2449 Messenger *osdc_messenger,
2450 MonClient *mc, const std::string &dev, const std::string &jdev);
2451 ~OSD() override;
2452
2453 // static bits
2454 static int mkfs(CephContext *cct, ObjectStore *store,
2455 const string& dev,
2456 uuid_d fsid, int whoami);
2457 /* remove any non-user xattrs from a map of them */
2458 void filter_xattrs(map<string, bufferptr>& attrs) {
2459 for (map<string, bufferptr>::iterator iter = attrs.begin();
2460 iter != attrs.end();
2461 ) {
2462 if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
2463 attrs.erase(iter++);
2464 else ++iter;
2465 }
2466 }
2467
2468 private:
2469 int mon_cmd_maybe_osd_create(string &cmd);
2470 int update_crush_device_class();
2471 int update_crush_location();
2472
2473 static int write_meta(ObjectStore *store,
2474 uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami);
2475
2476 void handle_pg_scrub(struct MOSDScrub *m, PG* pg);
2477 void handle_scrub(struct MOSDScrub *m);
2478 void handle_osd_ping(class MOSDPing *m);
2479
2480 int init_op_flags(OpRequestRef& op);
2481
2482 int get_num_op_shards();
2483 int get_num_op_threads();
2484
2485 public:
2486 static int peek_meta(ObjectStore *store, string& magic,
2487 uuid_d& cluster_fsid, uuid_d& osd_fsid, int& whoami);
2488
2489
2490 // startup/shutdown
2491 int pre_init();
2492 int init();
2493 void final_init();
2494
2495 int enable_disable_fuse(bool stop);
2496
2497 void suicide(int exitcode);
2498 int shutdown();
2499
2500 void handle_signal(int signum);
2501
2502 /// check if we can throw out op from a disconnected client
2503 static bool op_is_discardable(const MOSDOp *m);
2504
2505 public:
2506 OSDService service;
2507 friend class OSDService;
2508 };
2509
2510 //compatibility of the executable
2511 extern const CompatSet::Feature ceph_osd_feature_compat[];
2512 extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
2513 extern const CompatSet::Feature ceph_osd_feature_incompat[];
2514
2515 #endif