]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 John Spray <john.spray@inktank.com> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | */ | |
13 | ||
14 | #include "messages/MMgrDigest.h" | |
31f18b77 | 15 | #include "messages/MMonMgrReport.h" |
7c673cae FG |
16 | #include "messages/MPGStats.h" |
17 | ||
18 | #include "mgr/ClusterState.h" | |
19 | ||
20 | #define dout_context g_ceph_context | |
21 | #define dout_subsys ceph_subsys_mgr | |
22 | #undef dout_prefix | |
23 | #define dout_prefix *_dout << "mgr " << __func__ << " " | |
24 | ||
25 | ClusterState::ClusterState(MonClient *monc_, Objecter *objecter_) | |
31f18b77 | 26 | : monc(monc_), objecter(objecter_), lock("ClusterState"), pgservice(pg_map) |
7c673cae FG |
27 | {} |
28 | ||
29 | void ClusterState::set_objecter(Objecter *objecter_) | |
30 | { | |
31 | Mutex::Locker l(lock); | |
32 | ||
33 | objecter = objecter_; | |
34 | } | |
35 | ||
36 | void ClusterState::set_fsmap(FSMap const &new_fsmap) | |
37 | { | |
38 | Mutex::Locker l(lock); | |
39 | ||
40 | fsmap = new_fsmap; | |
41 | } | |
42 | ||
43 | void ClusterState::load_digest(MMgrDigest *m) | |
44 | { | |
45 | health_json = std::move(m->health_json); | |
46 | mon_status_json = std::move(m->mon_status_json); | |
47 | } | |
48 | ||
49 | void ClusterState::ingest_pgstats(MPGStats *stats) | |
50 | { | |
51 | Mutex::Locker l(lock); | |
7c673cae FG |
52 | |
53 | const int from = stats->get_orig_source().num(); | |
54 | bool is_in = false; | |
55 | objecter->with_osdmap([&is_in, from](const OSDMap &osd_map){ | |
56 | is_in = osd_map.is_in(from); | |
57 | }); | |
58 | ||
59 | if (is_in) { | |
31f18b77 | 60 | pending_inc.update_stat(from, stats->epoch, std::move(stats->osd_stat)); |
7c673cae FG |
61 | } else { |
62 | pending_inc.update_stat(from, stats->epoch, osd_stat_t()); | |
63 | } | |
64 | ||
65 | for (auto p : stats->pg_stat) { | |
66 | pg_t pgid = p.first; | |
67 | const auto &pg_stats = p.second; | |
68 | ||
69 | // In case we're hearing about a PG that according to last | |
70 | // OSDMap update should not exist | |
31f18b77 FG |
71 | if (existing_pools.count(pgid.pool()) == 0) { |
72 | dout(15) << " got " << pgid | |
73 | << " reported at " << pg_stats.reported_epoch << ":" | |
7c673cae FG |
74 | << pg_stats.reported_seq |
75 | << " state " << pg_state_string(pg_stats.state) | |
31f18b77 | 76 | << " but pool not in " << existing_pools |
7c673cae FG |
77 | << dendl; |
78 | continue; | |
31f18b77 FG |
79 | } |
80 | // In case we already heard about more recent stats from this PG | |
81 | // from another OSD | |
82 | if (pg_map.pg_stat[pgid].get_version_pair() > pg_stats.get_version_pair()) { | |
83 | dout(15) << " had " << pgid << " from " | |
84 | << pg_map.pg_stat[pgid].reported_epoch << ":" | |
7c673cae FG |
85 | << pg_map.pg_stat[pgid].reported_seq << dendl; |
86 | continue; | |
87 | } | |
88 | ||
89 | pending_inc.pg_stat_updates[pgid] = pg_stats; | |
90 | } | |
31f18b77 FG |
91 | } |
92 | ||
93 | void ClusterState::update_delta_stats() | |
94 | { | |
95 | pending_inc.stamp = ceph_clock_now(); | |
96 | pending_inc.version = pg_map.version + 1; // to make apply_incremental happy | |
97 | dout(10) << " v" << pending_inc.version << dendl; | |
98 | ||
99 | dout(30) << " pg_map before:\n"; | |
100 | JSONFormatter jf(true); | |
101 | jf.dump_object("pg_map", pg_map); | |
102 | jf.flush(*_dout); | |
103 | *_dout << dendl; | |
104 | dout(30) << " incremental:\n"; | |
105 | JSONFormatter jf(true); | |
106 | jf.dump_object("pending_inc", pending_inc); | |
107 | jf.flush(*_dout); | |
108 | *_dout << dendl; | |
7c673cae FG |
109 | |
110 | pg_map.apply_incremental(g_ceph_context, pending_inc); | |
31f18b77 | 111 | pending_inc = PGMap::Incremental(); |
7c673cae FG |
112 | } |
113 | ||
114 | void ClusterState::notify_osdmap(const OSDMap &osd_map) | |
115 | { | |
116 | Mutex::Locker l(lock); | |
117 | ||
31f18b77 | 118 | pending_inc.stamp = ceph_clock_now(); |
7c673cae | 119 | pending_inc.version = pg_map.version + 1; // to make apply_incremental happy |
31f18b77 | 120 | dout(10) << " v" << pending_inc.version << dendl; |
7c673cae | 121 | |
31f18b77 FG |
122 | PGMapUpdater::check_osd_map(g_ceph_context, osd_map, pg_map, &pending_inc); |
123 | ||
124 | // update our list of pools that exist, so that we can filter pg_map updates | |
125 | // in synchrony with this OSDMap. | |
126 | existing_pools.clear(); | |
127 | for (auto& p : osd_map.get_pools()) { | |
128 | existing_pools.insert(p.first); | |
129 | } | |
7c673cae FG |
130 | |
131 | // brute force this for now (don't bother being clever by only | |
132 | // checking osds that went up/down) | |
133 | set<int> need_check_down_pg_osds; | |
134 | PGMapUpdater::check_down_pgs(osd_map, pg_map, true, | |
135 | need_check_down_pg_osds, &pending_inc); | |
136 | ||
31f18b77 FG |
137 | dout(30) << " pg_map before:\n"; |
138 | JSONFormatter jf(true); | |
139 | jf.dump_object("pg_map", pg_map); | |
140 | jf.flush(*_dout); | |
141 | *_dout << dendl; | |
142 | dout(30) << " incremental:\n"; | |
143 | JSONFormatter jf(true); | |
144 | jf.dump_object("pending_inc", pending_inc); | |
145 | jf.flush(*_dout); | |
146 | *_dout << dendl; | |
7c673cae | 147 | |
31f18b77 FG |
148 | pg_map.apply_incremental(g_ceph_context, pending_inc); |
149 | pending_inc = PGMap::Incremental(); | |
7c673cae FG |
150 | // TODO: Complete the separation of PG state handling so |
151 | // that a cut-down set of functionality remains in PGMonitor | |
152 | // while the full-blown PGMap lives only here. | |
153 | } |