]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/ClusterState.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / mgr / ClusterState.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include "messages/MMgrDigest.h"
15 #include "messages/MMonMgrReport.h"
16 #include "messages/MPGStats.h"
17
18 #include "mgr/ClusterState.h"
19
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_mgr
22 #undef dout_prefix
23 #define dout_prefix *_dout << "mgr " << __func__ << " "
24
25 ClusterState::ClusterState(
26 MonClient *monc_,
27 Objecter *objecter_,
28 const MgrMap& mgrmap)
29 : monc(monc_),
30 objecter(objecter_),
31 lock("ClusterState"),
32 mgr_map(mgrmap)
33 {}
34
35 void ClusterState::set_objecter(Objecter *objecter_)
36 {
37 std::lock_guard l(lock);
38
39 objecter = objecter_;
40 }
41
42 void ClusterState::set_fsmap(FSMap const &new_fsmap)
43 {
44 std::lock_guard l(lock);
45
46 fsmap = new_fsmap;
47 }
48
49 void ClusterState::set_mgr_map(MgrMap const &new_mgrmap)
50 {
51 std::lock_guard l(lock);
52 mgr_map = new_mgrmap;
53 }
54
55 void ClusterState::set_service_map(ServiceMap const &new_service_map)
56 {
57 std::lock_guard l(lock);
58 servicemap = new_service_map;
59 }
60
61 void ClusterState::load_digest(MMgrDigest *m)
62 {
63 health_json = std::move(m->health_json);
64 mon_status_json = std::move(m->mon_status_json);
65 }
66
67 void ClusterState::ingest_pgstats(MPGStats *stats)
68 {
69 std::lock_guard l(lock);
70
71 const int from = stats->get_orig_source().num();
72 pending_inc.update_stat(from, std::move(stats->osd_stat));
73
74 for (auto p : stats->pg_stat) {
75 pg_t pgid = p.first;
76 const auto &pg_stats = p.second;
77
78 // In case we're hearing about a PG that according to last
79 // OSDMap update should not exist
80 auto r = existing_pools.find(pgid.pool());
81 if (r == existing_pools.end()) {
82 dout(15) << " got " << pgid
83 << " reported at " << pg_stats.reported_epoch << ":"
84 << pg_stats.reported_seq
85 << " state " << pg_state_string(pg_stats.state)
86 << " but pool not in " << existing_pools
87 << dendl;
88 continue;
89 }
90 if (pgid.ps() >= r->second) {
91 dout(15) << " got " << pgid
92 << " reported at " << pg_stats.reported_epoch << ":"
93 << pg_stats.reported_seq
94 << " state " << pg_state_string(pg_stats.state)
95 << " but > pg_num " << r->second
96 << dendl;
97 continue;
98 }
99 // In case we already heard about more recent stats from this PG
100 // from another OSD
101 const auto q = pg_map.pg_stat.find(pgid);
102 if (q != pg_map.pg_stat.end() &&
103 q->second.get_version_pair() > pg_stats.get_version_pair()) {
104 dout(15) << " had " << pgid << " from "
105 << q->second.reported_epoch << ":"
106 << q->second.reported_seq << dendl;
107 continue;
108 }
109
110 pending_inc.pg_stat_updates[pgid] = pg_stats;
111 }
112 for (auto p : stats->pool_stat) {
113 pending_inc.pool_statfs_updates[std::make_pair(p.first, from)] = p.second;
114 }
115 }
116
117 void ClusterState::update_delta_stats()
118 {
119 pending_inc.stamp = ceph_clock_now();
120 pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
121 dout(10) << " v" << pending_inc.version << dendl;
122
123 dout(30) << " pg_map before:\n";
124 JSONFormatter jf(true);
125 jf.dump_object("pg_map", pg_map);
126 jf.flush(*_dout);
127 *_dout << dendl;
128 dout(30) << " incremental:\n";
129 JSONFormatter jf(true);
130 jf.dump_object("pending_inc", pending_inc);
131 jf.flush(*_dout);
132 *_dout << dendl;
133 pg_map.apply_incremental(g_ceph_context, pending_inc);
134 pending_inc = PGMap::Incremental();
135 }
136
137 void ClusterState::notify_osdmap(const OSDMap &osd_map)
138 {
139 assert(ceph_mutex_is_locked(lock));
140
141 pending_inc.stamp = ceph_clock_now();
142 pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
143 dout(10) << " v" << pending_inc.version << dendl;
144
145 PGMapUpdater::check_osd_map(g_ceph_context, osd_map, pg_map, &pending_inc);
146
147 // update our list of pools that exist, so that we can filter pg_map updates
148 // in synchrony with this OSDMap.
149 existing_pools.clear();
150 for (auto& p : osd_map.get_pools()) {
151 existing_pools[p.first] = p.second.get_pg_num();
152 }
153
154 // brute force this for now (don't bother being clever by only
155 // checking osds that went up/down)
156 set<int> need_check_down_pg_osds;
157 PGMapUpdater::check_down_pgs(osd_map, pg_map, true,
158 need_check_down_pg_osds, &pending_inc);
159
160 dout(30) << " pg_map before:\n";
161 JSONFormatter jf(true);
162 jf.dump_object("pg_map", pg_map);
163 jf.flush(*_dout);
164 *_dout << dendl;
165 dout(30) << " incremental:\n";
166 JSONFormatter jf(true);
167 jf.dump_object("pending_inc", pending_inc);
168 jf.flush(*_dout);
169 *_dout << dendl;
170
171 pg_map.apply_incremental(g_ceph_context, pending_inc);
172 pending_inc = PGMap::Incremental();
173 // TODO: Complete the separation of PG state handling so
174 // that a cut-down set of functionality remains in PGMonitor
175 // while the full-blown PGMap lives only here.
176 }