]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/ClusterState.cc
8bc88c530f04fd492521745bf1562b68cbbb7276
[ceph.git] / ceph / src / mgr / ClusterState.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include "messages/MMgrDigest.h"
15 #include "messages/MMonMgrReport.h"
16 #include "messages/MPGStats.h"
17
18 #include "mgr/ClusterState.h"
19
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_mgr
22 #undef dout_prefix
23 #define dout_prefix *_dout << "mgr " << __func__ << " "
24
25 ClusterState::ClusterState(MonClient *monc_, Objecter *objecter_)
26 : monc(monc_), objecter(objecter_), lock("ClusterState"), pgservice(pg_map)
27 {}
28
29 void ClusterState::set_objecter(Objecter *objecter_)
30 {
31 Mutex::Locker l(lock);
32
33 objecter = objecter_;
34 }
35
36 void ClusterState::set_fsmap(FSMap const &new_fsmap)
37 {
38 Mutex::Locker l(lock);
39
40 fsmap = new_fsmap;
41 }
42
43 void ClusterState::load_digest(MMgrDigest *m)
44 {
45 health_json = std::move(m->health_json);
46 mon_status_json = std::move(m->mon_status_json);
47 }
48
49 void ClusterState::ingest_pgstats(MPGStats *stats)
50 {
51 Mutex::Locker l(lock);
52
53 const int from = stats->get_orig_source().num();
54 bool is_in = false;
55 objecter->with_osdmap([&is_in, from](const OSDMap &osd_map){
56 is_in = osd_map.is_in(from);
57 });
58
59 if (is_in) {
60 pending_inc.update_stat(from, stats->epoch, std::move(stats->osd_stat));
61 } else {
62 pending_inc.update_stat(from, stats->epoch, osd_stat_t());
63 }
64
65 for (auto p : stats->pg_stat) {
66 pg_t pgid = p.first;
67 const auto &pg_stats = p.second;
68
69 // In case we're hearing about a PG that according to last
70 // OSDMap update should not exist
71 if (existing_pools.count(pgid.pool()) == 0) {
72 dout(15) << " got " << pgid
73 << " reported at " << pg_stats.reported_epoch << ":"
74 << pg_stats.reported_seq
75 << " state " << pg_state_string(pg_stats.state)
76 << " but pool not in " << existing_pools
77 << dendl;
78 continue;
79 }
80 // In case we already heard about more recent stats from this PG
81 // from another OSD
82 if (pg_map.pg_stat[pgid].get_version_pair() > pg_stats.get_version_pair()) {
83 dout(15) << " had " << pgid << " from "
84 << pg_map.pg_stat[pgid].reported_epoch << ":"
85 << pg_map.pg_stat[pgid].reported_seq << dendl;
86 continue;
87 }
88
89 pending_inc.pg_stat_updates[pgid] = pg_stats;
90 }
91 }
92
93 void ClusterState::update_delta_stats()
94 {
95 pending_inc.stamp = ceph_clock_now();
96 pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
97 dout(10) << " v" << pending_inc.version << dendl;
98
99 dout(30) << " pg_map before:\n";
100 JSONFormatter jf(true);
101 jf.dump_object("pg_map", pg_map);
102 jf.flush(*_dout);
103 *_dout << dendl;
104 dout(30) << " incremental:\n";
105 JSONFormatter jf(true);
106 jf.dump_object("pending_inc", pending_inc);
107 jf.flush(*_dout);
108 *_dout << dendl;
109
110 pg_map.apply_incremental(g_ceph_context, pending_inc);
111 pending_inc = PGMap::Incremental();
112 }
113
114 void ClusterState::notify_osdmap(const OSDMap &osd_map)
115 {
116 Mutex::Locker l(lock);
117
118 pending_inc.stamp = ceph_clock_now();
119 pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
120 dout(10) << " v" << pending_inc.version << dendl;
121
122 PGMapUpdater::check_osd_map(g_ceph_context, osd_map, pg_map, &pending_inc);
123
124 // update our list of pools that exist, so that we can filter pg_map updates
125 // in synchrony with this OSDMap.
126 existing_pools.clear();
127 for (auto& p : osd_map.get_pools()) {
128 existing_pools.insert(p.first);
129 }
130
131 // brute force this for now (don't bother being clever by only
132 // checking osds that went up/down)
133 set<int> need_check_down_pg_osds;
134 PGMapUpdater::check_down_pgs(osd_map, pg_map, true,
135 need_check_down_pg_osds, &pending_inc);
136
137 dout(30) << " pg_map before:\n";
138 JSONFormatter jf(true);
139 jf.dump_object("pg_map", pg_map);
140 jf.flush(*_dout);
141 *_dout << dendl;
142 dout(30) << " incremental:\n";
143 JSONFormatter jf(true);
144 jf.dump_object("pending_inc", pending_inc);
145 jf.flush(*_dout);
146 *_dout << dendl;
147
148 pg_map.apply_incremental(g_ceph_context, pending_inc);
149 pending_inc = PGMap::Incremental();
150 // TODO: Complete the separation of PG state handling so
151 // that a cut-down set of functionality remains in PGMonitor
152 // while the full-blown PGMap lives only here.
153 }