]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 John Spray <john.spray@redhat.com> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | */ | |
13 | ||
14 | #include "DaemonServer.h" | |
b32b8144 | 15 | #include "mgr/Mgr.h" |
7c673cae | 16 | |
b32b8144 | 17 | #include "include/stringify.h" |
31f18b77 | 18 | #include "include/str_list.h" |
7c673cae | 19 | #include "auth/RotatingKeyRing.h" |
7c673cae FG |
20 | #include "json_spirit/json_spirit_writer.h" |
21 | ||
c07f9fc5 | 22 | #include "mgr/mgr_commands.h" |
b32b8144 | 23 | #include "mgr/OSDHealthMetricCollector.h" |
c07f9fc5 FG |
24 | #include "mon/MonCommand.h" |
25 | ||
7c673cae FG |
26 | #include "messages/MMgrOpen.h" |
27 | #include "messages/MMgrConfigure.h" | |
31f18b77 | 28 | #include "messages/MMonMgrReport.h" |
7c673cae FG |
29 | #include "messages/MCommand.h" |
30 | #include "messages/MCommandReply.h" | |
31 | #include "messages/MPGStats.h" | |
32 | #include "messages/MOSDScrub.h" | |
c07f9fc5 | 33 | #include "messages/MOSDForceRecovery.h" |
224ce89b | 34 | #include "common/errno.h" |
7c673cae FG |
35 | |
36 | #define dout_context g_ceph_context | |
37 | #define dout_subsys ceph_subsys_mgr | |
38 | #undef dout_prefix | |
39 | #define dout_prefix *_dout << "mgr.server " << __func__ << " " | |
40 | ||
c07f9fc5 FG |
41 | |
42 | ||
7c673cae FG |
43 | DaemonServer::DaemonServer(MonClient *monc_, |
44 | Finisher &finisher_, | |
45 | DaemonStateIndex &daemon_state_, | |
46 | ClusterState &cluster_state_, | |
3efd9988 | 47 | PyModuleRegistry &py_modules_, |
7c673cae FG |
48 | LogChannelRef clog_, |
49 | LogChannelRef audit_clog_) | |
50 | : Dispatcher(g_ceph_context), | |
51 | client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes", | |
3efd9988 | 52 | g_conf->get_val<uint64_t>("mgr_client_bytes"))), |
7c673cae | 53 | client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages", |
3efd9988 | 54 | g_conf->get_val<uint64_t>("mgr_client_messages"))), |
7c673cae | 55 | osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes", |
3efd9988 | 56 | g_conf->get_val<uint64_t>("mgr_osd_bytes"))), |
7c673cae | 57 | osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages", |
3efd9988 | 58 | g_conf->get_val<uint64_t>("mgr_osd_messages"))), |
7c673cae | 59 | mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes", |
3efd9988 | 60 | g_conf->get_val<uint64_t>("mgr_mds_bytes"))), |
7c673cae | 61 | mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages", |
3efd9988 | 62 | g_conf->get_val<uint64_t>("mgr_mds_messages"))), |
7c673cae | 63 | mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes", |
3efd9988 | 64 | g_conf->get_val<uint64_t>("mgr_mon_bytes"))), |
7c673cae | 65 | mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages", |
3efd9988 | 66 | g_conf->get_val<uint64_t>("mgr_mon_messages"))), |
7c673cae FG |
67 | msgr(nullptr), |
68 | monc(monc_), | |
69 | finisher(finisher_), | |
70 | daemon_state(daemon_state_), | |
71 | cluster_state(cluster_state_), | |
72 | py_modules(py_modules_), | |
73 | clog(clog_), | |
74 | audit_clog(audit_clog_), | |
b32b8144 | 75 | auth_cluster_registry(g_ceph_context, |
7c673cae FG |
76 | g_conf->auth_supported.empty() ? |
77 | g_conf->auth_cluster_required : | |
78 | g_conf->auth_supported), | |
b32b8144 FG |
79 | auth_service_registry(g_ceph_context, |
80 | g_conf->auth_supported.empty() ? | |
81 | g_conf->auth_service_required : | |
82 | g_conf->auth_supported), | |
3efd9988 FG |
83 | lock("DaemonServer"), |
84 | pgmap_ready(false) | |
85 | { | |
86 | g_conf->add_observer(this); | |
87 | } | |
7c673cae FG |
88 | |
89 | DaemonServer::~DaemonServer() { | |
90 | delete msgr; | |
3efd9988 | 91 | g_conf->remove_observer(this); |
7c673cae FG |
92 | } |
93 | ||
94 | int DaemonServer::init(uint64_t gid, entity_addr_t client_addr) | |
95 | { | |
96 | // Initialize Messenger | |
97 | std::string public_msgr_type = g_conf->ms_public_type.empty() ? | |
98 | g_conf->get_val<std::string>("ms_type") : g_conf->ms_public_type; | |
99 | msgr = Messenger::create(g_ceph_context, public_msgr_type, | |
100 | entity_name_t::MGR(gid), | |
101 | "mgr", | |
102 | getpid(), 0); | |
103 | msgr->set_default_policy(Messenger::Policy::stateless_server(0)); | |
104 | ||
105 | // throttle clients | |
106 | msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT, | |
107 | client_byte_throttler.get(), | |
108 | client_msg_throttler.get()); | |
109 | ||
110 | // servers | |
111 | msgr->set_policy_throttlers(entity_name_t::TYPE_OSD, | |
112 | osd_byte_throttler.get(), | |
113 | osd_msg_throttler.get()); | |
114 | msgr->set_policy_throttlers(entity_name_t::TYPE_MDS, | |
115 | mds_byte_throttler.get(), | |
116 | mds_msg_throttler.get()); | |
117 | msgr->set_policy_throttlers(entity_name_t::TYPE_MON, | |
118 | mon_byte_throttler.get(), | |
119 | mon_msg_throttler.get()); | |
120 | ||
121 | int r = msgr->bind(g_conf->public_addr); | |
122 | if (r < 0) { | |
123 | derr << "unable to bind mgr to " << g_conf->public_addr << dendl; | |
124 | return r; | |
125 | } | |
126 | ||
127 | msgr->set_myname(entity_name_t::MGR(gid)); | |
128 | msgr->set_addr_unknowns(client_addr); | |
129 | ||
130 | msgr->start(); | |
131 | msgr->add_dispatcher_tail(this); | |
132 | ||
224ce89b WB |
133 | started_at = ceph_clock_now(); |
134 | ||
7c673cae FG |
135 | return 0; |
136 | } | |
137 | ||
138 | entity_addr_t DaemonServer::get_myaddr() const | |
139 | { | |
140 | return msgr->get_myaddr(); | |
141 | } | |
142 | ||
143 | ||
28e407b8 AA |
144 | bool DaemonServer::ms_verify_authorizer( |
145 | Connection *con, | |
146 | int peer_type, | |
147 | int protocol, | |
148 | ceph::bufferlist& authorizer_data, | |
149 | ceph::bufferlist& authorizer_reply, | |
150 | bool& is_valid, | |
151 | CryptoKey& session_key, | |
152 | std::unique_ptr<AuthAuthorizerChallenge> *challenge) | |
7c673cae | 153 | { |
b32b8144 FG |
154 | AuthAuthorizeHandler *handler = nullptr; |
155 | if (peer_type == CEPH_ENTITY_TYPE_OSD || | |
156 | peer_type == CEPH_ENTITY_TYPE_MON || | |
157 | peer_type == CEPH_ENTITY_TYPE_MDS || | |
158 | peer_type == CEPH_ENTITY_TYPE_MGR) { | |
159 | handler = auth_cluster_registry.get_handler(protocol); | |
160 | } else { | |
161 | handler = auth_service_registry.get_handler(protocol); | |
162 | } | |
7c673cae FG |
163 | if (!handler) { |
164 | dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl; | |
165 | is_valid = false; | |
166 | return true; | |
167 | } | |
168 | ||
169 | MgrSessionRef s(new MgrSession(cct)); | |
170 | s->inst.addr = con->get_peer_addr(); | |
171 | AuthCapsInfo caps_info; | |
172 | ||
c07f9fc5 FG |
173 | RotatingKeyRing *keys = monc->rotating_secrets.get(); |
174 | if (keys) { | |
175 | is_valid = handler->verify_authorizer( | |
176 | cct, keys, | |
177 | authorizer_data, | |
178 | authorizer_reply, s->entity_name, | |
179 | s->global_id, caps_info, | |
28e407b8 AA |
180 | session_key, |
181 | nullptr, | |
182 | challenge); | |
c07f9fc5 FG |
183 | } else { |
184 | dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl; | |
185 | is_valid = false; | |
186 | } | |
7c673cae FG |
187 | |
188 | if (is_valid) { | |
189 | if (caps_info.allow_all) { | |
190 | dout(10) << " session " << s << " " << s->entity_name | |
191 | << " allow_all" << dendl; | |
192 | s->caps.set_allow_all(); | |
193 | } | |
194 | if (caps_info.caps.length() > 0) { | |
195 | bufferlist::iterator p = caps_info.caps.begin(); | |
196 | string str; | |
197 | try { | |
198 | ::decode(str, p); | |
199 | } | |
200 | catch (buffer::error& e) { | |
201 | } | |
202 | bool success = s->caps.parse(str); | |
203 | if (success) { | |
204 | dout(10) << " session " << s << " " << s->entity_name | |
205 | << " has caps " << s->caps << " '" << str << "'" << dendl; | |
206 | } else { | |
207 | dout(10) << " session " << s << " " << s->entity_name | |
208 | << " failed to parse caps '" << str << "'" << dendl; | |
209 | is_valid = false; | |
210 | } | |
211 | } | |
212 | con->set_priv(s->get()); | |
31f18b77 FG |
213 | |
214 | if (peer_type == CEPH_ENTITY_TYPE_OSD) { | |
215 | Mutex::Locker l(lock); | |
216 | s->osd_id = atoi(s->entity_name.get_id().c_str()); | |
224ce89b | 217 | dout(10) << "registering osd." << s->osd_id << " session " |
31f18b77 FG |
218 | << s << " con " << con << dendl; |
219 | osd_cons[s->osd_id].insert(con); | |
220 | } | |
7c673cae FG |
221 | } |
222 | ||
223 | return true; | |
224 | } | |
225 | ||
226 | ||
227 | bool DaemonServer::ms_get_authorizer(int dest_type, | |
228 | AuthAuthorizer **authorizer, bool force_new) | |
229 | { | |
230 | dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl; | |
231 | ||
232 | if (dest_type == CEPH_ENTITY_TYPE_MON) { | |
233 | return true; | |
234 | } | |
235 | ||
236 | if (force_new) { | |
237 | if (monc->wait_auth_rotating(10) < 0) | |
238 | return false; | |
239 | } | |
240 | ||
241 | *authorizer = monc->build_authorizer(dest_type); | |
242 | dout(20) << "got authorizer " << *authorizer << dendl; | |
243 | return *authorizer != NULL; | |
244 | } | |
245 | ||
31f18b77 FG |
246 | bool DaemonServer::ms_handle_reset(Connection *con) |
247 | { | |
248 | if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) { | |
249 | MgrSessionRef session(static_cast<MgrSession*>(con->get_priv())); | |
250 | if (!session) { | |
251 | return false; | |
252 | } | |
253 | session->put(); // SessionRef takes a ref | |
254 | Mutex::Locker l(lock); | |
224ce89b | 255 | dout(10) << "unregistering osd." << session->osd_id |
31f18b77 FG |
256 | << " session " << session << " con " << con << dendl; |
257 | osd_cons[session->osd_id].erase(con); | |
3efd9988 FG |
258 | |
259 | auto iter = daemon_connections.find(con); | |
260 | if (iter != daemon_connections.end()) { | |
261 | daemon_connections.erase(iter); | |
262 | } | |
31f18b77 FG |
263 | } |
264 | return false; | |
265 | } | |
266 | ||
7c673cae FG |
267 | bool DaemonServer::ms_handle_refused(Connection *con) |
268 | { | |
269 | // do nothing for now | |
270 | return false; | |
271 | } | |
272 | ||
273 | bool DaemonServer::ms_dispatch(Message *m) | |
274 | { | |
3efd9988 FG |
275 | // Note that we do *not* take ::lock here, in order to avoid |
276 | // serializing all message handling. It's up to each handler | |
277 | // to take whatever locks it needs. | |
7c673cae FG |
278 | switch (m->get_type()) { |
279 | case MSG_PGSTATS: | |
280 | cluster_state.ingest_pgstats(static_cast<MPGStats*>(m)); | |
224ce89b | 281 | maybe_ready(m->get_source().num()); |
7c673cae FG |
282 | m->put(); |
283 | return true; | |
284 | case MSG_MGR_REPORT: | |
285 | return handle_report(static_cast<MMgrReport*>(m)); | |
286 | case MSG_MGR_OPEN: | |
287 | return handle_open(static_cast<MMgrOpen*>(m)); | |
288 | case MSG_COMMAND: | |
289 | return handle_command(static_cast<MCommand*>(m)); | |
290 | default: | |
291 | dout(1) << "Unhandled message type " << m->get_type() << dendl; | |
292 | return false; | |
293 | }; | |
294 | } | |
295 | ||
224ce89b WB |
296 | void DaemonServer::maybe_ready(int32_t osd_id) |
297 | { | |
3efd9988 FG |
298 | if (pgmap_ready.load()) { |
299 | // Fast path: we don't need to take lock because pgmap_ready | |
300 | // is already set | |
301 | } else { | |
302 | Mutex::Locker l(lock); | |
224ce89b | 303 | |
3efd9988 FG |
304 | if (reported_osds.find(osd_id) == reported_osds.end()) { |
305 | dout(4) << "initial report from osd " << osd_id << dendl; | |
306 | reported_osds.insert(osd_id); | |
307 | std::set<int32_t> up_osds; | |
224ce89b | 308 | |
3efd9988 FG |
309 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { |
310 | osdmap.get_up_osds(up_osds); | |
311 | }); | |
224ce89b | 312 | |
3efd9988 FG |
313 | std::set<int32_t> unreported_osds; |
314 | std::set_difference(up_osds.begin(), up_osds.end(), | |
315 | reported_osds.begin(), reported_osds.end(), | |
316 | std::inserter(unreported_osds, unreported_osds.begin())); | |
317 | ||
318 | if (unreported_osds.size() == 0) { | |
319 | dout(4) << "all osds have reported, sending PG state to mon" << dendl; | |
320 | pgmap_ready = true; | |
321 | reported_osds.clear(); | |
322 | // Avoid waiting for next tick | |
323 | send_report(); | |
324 | } else { | |
325 | dout(4) << "still waiting for " << unreported_osds.size() << " osds" | |
326 | " to report in before PGMap is ready" << dendl; | |
327 | } | |
224ce89b WB |
328 | } |
329 | } | |
330 | } | |
331 | ||
7c673cae FG |
332 | void DaemonServer::shutdown() |
333 | { | |
224ce89b | 334 | dout(10) << "begin" << dendl; |
7c673cae FG |
335 | msgr->shutdown(); |
336 | msgr->wait(); | |
224ce89b | 337 | dout(10) << "done" << dendl; |
7c673cae FG |
338 | } |
339 | ||
340 | ||
341 | ||
342 | bool DaemonServer::handle_open(MMgrOpen *m) | |
343 | { | |
3efd9988 FG |
344 | Mutex::Locker l(lock); |
345 | ||
224ce89b WB |
346 | DaemonKey key; |
347 | if (!m->service_name.empty()) { | |
348 | key.first = m->service_name; | |
349 | } else { | |
350 | key.first = ceph_entity_type_name(m->get_connection()->get_peer_type()); | |
351 | } | |
352 | key.second = m->daemon_name; | |
7c673cae | 353 | |
224ce89b | 354 | dout(4) << "from " << m->get_connection() << " " << key << dendl; |
7c673cae | 355 | |
3efd9988 | 356 | _send_configure(m->get_connection()); |
7c673cae | 357 | |
c07f9fc5 | 358 | DaemonStatePtr daemon; |
7c673cae | 359 | if (daemon_state.exists(key)) { |
c07f9fc5 FG |
360 | daemon = daemon_state.get(key); |
361 | } | |
362 | if (daemon) { | |
7c673cae | 363 | dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl; |
c07f9fc5 | 364 | Mutex::Locker l(daemon->lock); |
3efd9988 | 365 | daemon->perf_counters.clear(); |
7c673cae FG |
366 | } |
367 | ||
224ce89b | 368 | if (m->service_daemon) { |
c07f9fc5 | 369 | if (!daemon) { |
224ce89b WB |
370 | dout(4) << "constructing new DaemonState for " << key << dendl; |
371 | daemon = std::make_shared<DaemonState>(daemon_state.types); | |
372 | daemon->key = key; | |
373 | if (m->daemon_metadata.count("hostname")) { | |
374 | daemon->hostname = m->daemon_metadata["hostname"]; | |
375 | } | |
376 | daemon_state.insert(daemon); | |
377 | } | |
c07f9fc5 | 378 | Mutex::Locker l(daemon->lock); |
224ce89b WB |
379 | daemon->service_daemon = true; |
380 | daemon->metadata = m->daemon_metadata; | |
381 | daemon->service_status = m->daemon_status; | |
382 | ||
383 | utime_t now = ceph_clock_now(); | |
384 | auto d = pending_service_map.get_daemon(m->service_name, | |
385 | m->daemon_name); | |
386 | if (d->gid != (uint64_t)m->get_source().num()) { | |
387 | dout(10) << "registering " << key << " in pending_service_map" << dendl; | |
388 | d->gid = m->get_source().num(); | |
389 | d->addr = m->get_source_addr(); | |
390 | d->start_epoch = pending_service_map.epoch; | |
391 | d->start_stamp = now; | |
392 | d->metadata = m->daemon_metadata; | |
393 | pending_service_map_dirty = pending_service_map.epoch; | |
394 | } | |
395 | } | |
396 | ||
3efd9988 FG |
397 | if (m->get_connection()->get_peer_type() != entity_name_t::TYPE_CLIENT && |
398 | m->service_name.empty()) | |
399 | { | |
400 | // Store in set of the daemon/service connections, i.e. those | |
401 | // connections that require an update in the event of stats | |
402 | // configuration changes. | |
403 | daemon_connections.insert(m->get_connection()); | |
404 | } | |
405 | ||
7c673cae FG |
406 | m->put(); |
407 | return true; | |
408 | } | |
409 | ||
410 | bool DaemonServer::handle_report(MMgrReport *m) | |
411 | { | |
224ce89b WB |
412 | DaemonKey key; |
413 | if (!m->service_name.empty()) { | |
414 | key.first = m->service_name; | |
415 | } else { | |
416 | key.first = ceph_entity_type_name(m->get_connection()->get_peer_type()); | |
417 | } | |
418 | key.second = m->daemon_name; | |
7c673cae | 419 | |
224ce89b | 420 | dout(4) << "from " << m->get_connection() << " " << key << dendl; |
31f18b77 | 421 | |
224ce89b WB |
422 | if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT && |
423 | m->service_name.empty()) { | |
424 | // Clients should not be sending us stats unless they are declaring | |
425 | // themselves to be a daemon for some service. | |
426 | dout(4) << "rejecting report from non-daemon client " << m->daemon_name | |
427 | << dendl; | |
b32b8144 | 428 | m->get_connection()->mark_down(); |
31f18b77 FG |
429 | m->put(); |
430 | return true; | |
431 | } | |
7c673cae | 432 | |
3efd9988 | 433 | // Look up the DaemonState |
7c673cae FG |
434 | DaemonStatePtr daemon; |
435 | if (daemon_state.exists(key)) { | |
224ce89b | 436 | dout(20) << "updating existing DaemonState for " << key << dendl; |
7c673cae FG |
437 | daemon = daemon_state.get(key); |
438 | } else { | |
b32b8144 FG |
439 | // we don't know the hostname at this stage, reject MMgrReport here. |
440 | dout(5) << "rejecting report from " << key << ", since we do not have its metadata now." | |
441 | << dendl; | |
442 | ||
443 | // issue metadata request in background | |
444 | if (!daemon_state.is_updating(key) && | |
445 | (key.first == "osd" || key.first == "mds")) { | |
446 | ||
447 | std::ostringstream oss; | |
448 | auto c = new MetadataUpdate(daemon_state, key); | |
449 | if (key.first == "osd") { | |
450 | oss << "{\"prefix\": \"osd metadata\", \"id\": " | |
451 | << key.second<< "}"; | |
452 | ||
453 | } else if (key.first == "mds") { | |
454 | c->set_default("addr", stringify(m->get_source_addr())); | |
455 | oss << "{\"prefix\": \"mds metadata\", \"who\": \"" | |
456 | << key.second << "\"}"; | |
457 | ||
458 | } else { | |
459 | ceph_abort(); | |
460 | } | |
461 | ||
462 | monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c); | |
463 | } | |
464 | ||
465 | { | |
466 | Mutex::Locker l(lock); | |
467 | // kill session | |
468 | MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv())); | |
469 | if (!session) { | |
470 | return false; | |
471 | } | |
472 | m->get_connection()->mark_down(); | |
473 | session->put(); | |
474 | ||
475 | dout(10) << "unregistering osd." << session->osd_id | |
476 | << " session " << session << " con " << m->get_connection() << dendl; | |
477 | ||
478 | if (osd_cons.find(session->osd_id) != osd_cons.end()) { | |
479 | osd_cons[session->osd_id].erase(m->get_connection()); | |
480 | } | |
481 | ||
482 | auto iter = daemon_connections.find(m->get_connection()); | |
483 | if (iter != daemon_connections.end()) { | |
484 | daemon_connections.erase(iter); | |
485 | } | |
486 | } | |
487 | ||
488 | return false; | |
7c673cae | 489 | } |
3efd9988 FG |
490 | |
491 | // Update the DaemonState | |
7c673cae | 492 | assert(daemon != nullptr); |
c07f9fc5 FG |
493 | { |
494 | Mutex::Locker l(daemon->lock); | |
3efd9988 | 495 | auto &daemon_counters = daemon->perf_counters; |
c07f9fc5 | 496 | daemon_counters.update(m); |
3efd9988 FG |
497 | |
498 | if (daemon->service_daemon) { | |
499 | utime_t now = ceph_clock_now(); | |
500 | if (m->daemon_status) { | |
501 | daemon->service_status = *m->daemon_status; | |
502 | daemon->service_status_stamp = now; | |
503 | } | |
504 | daemon->last_service_beacon = now; | |
505 | } else if (m->daemon_status) { | |
506 | derr << "got status from non-daemon " << key << dendl; | |
507 | } | |
b32b8144 FG |
508 | if (m->get_connection()->peer_is_osd()) { |
509 | // only OSD sends health_checks to me now | |
510 | daemon->osd_health_metrics = std::move(m->osd_health_metrics); | |
511 | } | |
c07f9fc5 | 512 | } |
3efd9988 | 513 | |
c07f9fc5 FG |
514 | // if there are any schema updates, notify the python modules |
515 | if (!m->declare_types.empty() || !m->undeclare_types.empty()) { | |
516 | ostringstream oss; | |
517 | oss << key.first << '.' << key.second; | |
518 | py_modules.notify_all("perf_schema_update", oss.str()); | |
519 | } | |
224ce89b | 520 | |
7c673cae FG |
521 | m->put(); |
522 | return true; | |
523 | } | |
524 | ||
7c673cae FG |
525 | |
526 | void DaemonServer::_generate_command_map( | |
527 | map<string,cmd_vartype>& cmdmap, | |
528 | map<string,string> ¶m_str_map) | |
529 | { | |
530 | for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin(); | |
531 | p != cmdmap.end(); ++p) { | |
532 | if (p->first == "prefix") | |
533 | continue; | |
534 | if (p->first == "caps") { | |
535 | vector<string> cv; | |
536 | if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) && | |
537 | cv.size() % 2 == 0) { | |
538 | for (unsigned i = 0; i < cv.size(); i += 2) { | |
539 | string k = string("caps_") + cv[i]; | |
540 | param_str_map[k] = cv[i + 1]; | |
541 | } | |
542 | continue; | |
543 | } | |
544 | } | |
545 | param_str_map[p->first] = cmd_vartype_stringify(p->second); | |
546 | } | |
547 | } | |
548 | ||
c07f9fc5 | 549 | const MonCommand *DaemonServer::_get_mgrcommand( |
7c673cae | 550 | const string &cmd_prefix, |
c07f9fc5 | 551 | const std::vector<MonCommand> &cmds) |
7c673cae | 552 | { |
c07f9fc5 FG |
553 | const MonCommand *this_cmd = nullptr; |
554 | for (const auto &cmd : cmds) { | |
555 | if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) { | |
556 | this_cmd = &cmd; | |
7c673cae FG |
557 | break; |
558 | } | |
559 | } | |
560 | return this_cmd; | |
561 | } | |
562 | ||
563 | bool DaemonServer::_allowed_command( | |
564 | MgrSession *s, | |
565 | const string &module, | |
566 | const string &prefix, | |
567 | const map<string,cmd_vartype>& cmdmap, | |
568 | const map<string,string>& param_str_map, | |
c07f9fc5 | 569 | const MonCommand *this_cmd) { |
7c673cae FG |
570 | |
571 | if (s->entity_name.is_mon()) { | |
572 | // mon is all-powerful. even when it is forwarding commands on behalf of | |
573 | // old clients; we expect the mon is validating commands before proxying! | |
574 | return true; | |
575 | } | |
576 | ||
577 | bool cmd_r = this_cmd->requires_perm('r'); | |
578 | bool cmd_w = this_cmd->requires_perm('w'); | |
579 | bool cmd_x = this_cmd->requires_perm('x'); | |
580 | ||
581 | bool capable = s->caps.is_capable( | |
582 | g_ceph_context, | |
583 | CEPH_ENTITY_TYPE_MGR, | |
584 | s->entity_name, | |
585 | module, prefix, param_str_map, | |
586 | cmd_r, cmd_w, cmd_x); | |
587 | ||
588 | dout(10) << " " << s->entity_name << " " | |
589 | << (capable ? "" : "not ") << "capable" << dendl; | |
590 | return capable; | |
591 | } | |
592 | ||
593 | bool DaemonServer::handle_command(MCommand *m) | |
594 | { | |
3efd9988 | 595 | Mutex::Locker l(lock); |
7c673cae FG |
596 | int r = 0; |
597 | std::stringstream ss; | |
598 | std::string prefix; | |
599 | ||
600 | assert(lock.is_locked_by_me()); | |
601 | ||
602 | /** | |
603 | * The working data for processing an MCommand. This lives in | |
604 | * a class to enable passing it into other threads for processing | |
605 | * outside of the thread/locks that called handle_command. | |
606 | */ | |
607 | class CommandContext | |
608 | { | |
609 | public: | |
610 | MCommand *m; | |
611 | bufferlist odata; | |
612 | cmdmap_t cmdmap; | |
613 | ||
614 | CommandContext(MCommand *m_) | |
615 | : m(m_) | |
616 | { | |
617 | } | |
618 | ||
619 | ~CommandContext() | |
620 | { | |
621 | m->put(); | |
622 | } | |
623 | ||
624 | void reply(int r, const std::stringstream &ss) | |
625 | { | |
626 | reply(r, ss.str()); | |
627 | } | |
628 | ||
629 | void reply(int r, const std::string &rs) | |
630 | { | |
631 | // Let the connection drop as soon as we've sent our response | |
632 | ConnectionRef con = m->get_connection(); | |
633 | if (con) { | |
634 | con->mark_disposable(); | |
635 | } | |
636 | ||
224ce89b | 637 | dout(1) << "handle_command " << cpp_strerror(r) << " " << rs << dendl; |
7c673cae FG |
638 | if (con) { |
639 | MCommandReply *reply = new MCommandReply(r, rs); | |
640 | reply->set_tid(m->get_tid()); | |
641 | reply->set_data(odata); | |
642 | con->send_message(reply); | |
643 | } | |
644 | } | |
645 | }; | |
646 | ||
647 | /** | |
648 | * A context for receiving a bufferlist/error string from a background | |
649 | * function and then calling back to a CommandContext when it's done | |
650 | */ | |
651 | class ReplyOnFinish : public Context { | |
652 | std::shared_ptr<CommandContext> cmdctx; | |
653 | ||
654 | public: | |
655 | bufferlist from_mon; | |
656 | string outs; | |
657 | ||
658 | ReplyOnFinish(std::shared_ptr<CommandContext> cmdctx_) | |
659 | : cmdctx(cmdctx_) | |
660 | {} | |
661 | void finish(int r) override { | |
662 | cmdctx->odata.claim_append(from_mon); | |
663 | cmdctx->reply(r, outs); | |
664 | } | |
665 | }; | |
666 | ||
667 | std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m); | |
668 | ||
669 | MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv())); | |
670 | if (!session) { | |
671 | return true; | |
672 | } | |
673 | session->put(); // SessionRef takes a ref | |
674 | if (session->inst.name == entity_name_t()) | |
675 | session->inst.name = m->get_source(); | |
676 | ||
677 | std::string format; | |
678 | boost::scoped_ptr<Formatter> f; | |
679 | map<string,string> param_str_map; | |
680 | ||
681 | if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) { | |
682 | cmdctx->reply(-EINVAL, ss); | |
683 | return true; | |
684 | } | |
685 | ||
686 | { | |
687 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain")); | |
688 | f.reset(Formatter::create(format)); | |
689 | } | |
690 | ||
691 | cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix); | |
692 | ||
693 | dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl; | |
694 | dout(4) << "prefix=" << prefix << dendl; | |
695 | ||
696 | if (prefix == "get_command_descriptions") { | |
7c673cae | 697 | dout(10) << "reading commands from python modules" << dendl; |
c07f9fc5 | 698 | const auto py_commands = py_modules.get_commands(); |
7c673cae | 699 | |
c07f9fc5 | 700 | int cmdnum = 0; |
7c673cae FG |
701 | JSONFormatter f; |
702 | f.open_object_section("command_descriptions"); | |
c07f9fc5 FG |
703 | |
704 | auto dump_cmd = [&cmdnum, &f](const MonCommand &mc){ | |
7c673cae FG |
705 | ostringstream secname; |
706 | secname << "cmd" << setfill('0') << std::setw(3) << cmdnum; | |
c07f9fc5 FG |
707 | dump_cmddesc_to_json(&f, secname.str(), mc.cmdstring, mc.helpstring, |
708 | mc.module, mc.req_perms, mc.availability, 0); | |
7c673cae | 709 | cmdnum++; |
c07f9fc5 FG |
710 | }; |
711 | ||
712 | for (const auto &pyc : py_commands) { | |
713 | dump_cmd(pyc); | |
7c673cae | 714 | } |
7c673cae | 715 | |
c07f9fc5 FG |
716 | for (const auto &mgr_cmd : mgr_commands) { |
717 | dump_cmd(mgr_cmd); | |
7c673cae | 718 | } |
c07f9fc5 | 719 | |
7c673cae FG |
720 | f.close_section(); // command_descriptions |
721 | f.flush(cmdctx->odata); | |
722 | cmdctx->reply(0, ss); | |
723 | return true; | |
724 | } | |
725 | ||
726 | // lookup command | |
c07f9fc5 | 727 | const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands); |
7c673cae FG |
728 | _generate_command_map(cmdctx->cmdmap, param_str_map); |
729 | if (!mgr_cmd) { | |
c07f9fc5 | 730 | MonCommand py_command = {"", "", "py", "rw", "cli"}; |
7c673cae FG |
731 | if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap, |
732 | param_str_map, &py_command)) { | |
733 | dout(1) << " access denied" << dendl; | |
31f18b77 FG |
734 | ss << "access denied; does your client key have mgr caps?" |
735 | " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication"; | |
7c673cae FG |
736 | cmdctx->reply(-EACCES, ss); |
737 | return true; | |
738 | } | |
739 | } else { | |
740 | // validate user's permissions for requested command | |
741 | if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdctx->cmdmap, | |
742 | param_str_map, mgr_cmd)) { | |
743 | dout(1) << " access denied" << dendl; | |
744 | audit_clog->info() << "from='" << session->inst << "' " | |
745 | << "entity='" << session->entity_name << "' " | |
746 | << "cmd=" << m->cmd << ": access denied"; | |
31f18b77 FG |
747 | ss << "access denied' does your client key have mgr caps?" |
748 | " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication"; | |
7c673cae FG |
749 | cmdctx->reply(-EACCES, ss); |
750 | return true; | |
751 | } | |
752 | } | |
753 | ||
754 | audit_clog->debug() | |
755 | << "from='" << session->inst << "' " | |
756 | << "entity='" << session->entity_name << "' " | |
757 | << "cmd=" << m->cmd << ": dispatch"; | |
758 | ||
224ce89b WB |
759 | // ---------------- |
760 | // service map commands | |
761 | if (prefix == "service dump") { | |
762 | if (!f) | |
763 | f.reset(Formatter::create("json-pretty")); | |
764 | cluster_state.with_servicemap([&](const ServiceMap &service_map) { | |
765 | f->dump_object("service_map", service_map); | |
766 | }); | |
767 | f->flush(cmdctx->odata); | |
768 | cmdctx->reply(0, ss); | |
769 | return true; | |
770 | } | |
771 | if (prefix == "service status") { | |
772 | if (!f) | |
773 | f.reset(Formatter::create("json-pretty")); | |
774 | // only include state from services that are in the persisted service map | |
775 | f->open_object_section("service_status"); | |
776 | ServiceMap s; | |
777 | cluster_state.with_servicemap([&](const ServiceMap& service_map) { | |
778 | s = service_map; | |
779 | }); | |
780 | for (auto& p : s.services) { | |
781 | f->open_object_section(p.first.c_str()); | |
782 | for (auto& q : p.second.daemons) { | |
783 | f->open_object_section(q.first.c_str()); | |
784 | DaemonKey key(p.first, q.first); | |
785 | assert(daemon_state.exists(key)); | |
786 | auto daemon = daemon_state.get(key); | |
c07f9fc5 | 787 | Mutex::Locker l(daemon->lock); |
224ce89b WB |
788 | f->dump_stream("status_stamp") << daemon->service_status_stamp; |
789 | f->dump_stream("last_beacon") << daemon->last_service_beacon; | |
790 | f->open_object_section("status"); | |
791 | for (auto& r : daemon->service_status) { | |
792 | f->dump_string(r.first.c_str(), r.second); | |
793 | } | |
794 | f->close_section(); | |
795 | f->close_section(); | |
796 | } | |
797 | f->close_section(); | |
798 | } | |
799 | f->close_section(); | |
800 | f->flush(cmdctx->odata); | |
801 | cmdctx->reply(0, ss); | |
802 | return true; | |
803 | } | |
804 | ||
3efd9988 FG |
805 | if (prefix == "config set") { |
806 | std::string key; | |
807 | std::string val; | |
808 | cmd_getval(cct, cmdctx->cmdmap, "key", key); | |
809 | cmd_getval(cct, cmdctx->cmdmap, "value", val); | |
810 | r = cct->_conf->set_val(key, val, true, &ss); | |
811 | if (r == 0) { | |
812 | cct->_conf->apply_changes(nullptr); | |
813 | } | |
814 | cmdctx->reply(0, ss); | |
815 | return true; | |
816 | } | |
817 | ||
7c673cae FG |
818 | // ----------- |
819 | // PG commands | |
820 | ||
821 | if (prefix == "pg scrub" || | |
822 | prefix == "pg repair" || | |
823 | prefix == "pg deep-scrub") { | |
824 | string scrubop = prefix.substr(3, string::npos); | |
825 | pg_t pgid; | |
826 | string pgidstr; | |
827 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr); | |
828 | if (!pgid.parse(pgidstr.c_str())) { | |
829 | ss << "invalid pgid '" << pgidstr << "'"; | |
830 | cmdctx->reply(-EINVAL, ss); | |
831 | return true; | |
832 | } | |
833 | bool pg_exists = false; | |
834 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
835 | pg_exists = osdmap.pg_exists(pgid); | |
836 | }); | |
837 | if (!pg_exists) { | |
838 | ss << "pg " << pgid << " dne"; | |
839 | cmdctx->reply(-ENOENT, ss); | |
840 | return true; | |
841 | } | |
842 | int acting_primary = -1; | |
7c673cae FG |
843 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { |
844 | acting_primary = osdmap.get_pg_acting_primary(pgid); | |
7c673cae FG |
845 | }); |
846 | if (acting_primary == -1) { | |
847 | ss << "pg " << pgid << " has no primary osd"; | |
848 | cmdctx->reply(-EAGAIN, ss); | |
849 | return true; | |
850 | } | |
31f18b77 FG |
851 | auto p = osd_cons.find(acting_primary); |
852 | if (p == osd_cons.end()) { | |
853 | ss << "pg " << pgid << " primary osd." << acting_primary | |
854 | << " is not currently connected"; | |
855 | cmdctx->reply(-EAGAIN, ss); | |
856 | } | |
7c673cae | 857 | vector<pg_t> pgs = { pgid }; |
31f18b77 FG |
858 | for (auto& con : p->second) { |
859 | con->send_message(new MOSDScrub(monc->get_fsid(), | |
860 | pgs, | |
861 | scrubop == "repair", | |
862 | scrubop == "deep-scrub")); | |
863 | } | |
7c673cae | 864 | ss << "instructing pg " << pgid << " on osd." << acting_primary |
31f18b77 FG |
865 | << " to " << scrubop; |
866 | cmdctx->reply(0, ss); | |
867 | return true; | |
868 | } else if (prefix == "osd scrub" || | |
869 | prefix == "osd deep-scrub" || | |
870 | prefix == "osd repair") { | |
871 | string whostr; | |
872 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr); | |
873 | vector<string> pvec; | |
874 | get_str_vec(prefix, pvec); | |
875 | ||
876 | set<int> osds; | |
224ce89b | 877 | if (whostr == "*" || whostr == "all" || whostr == "any") { |
31f18b77 FG |
878 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { |
879 | for (int i = 0; i < osdmap.get_max_osd(); i++) | |
880 | if (osdmap.is_up(i)) { | |
881 | osds.insert(i); | |
882 | } | |
883 | }); | |
884 | } else { | |
885 | long osd = parse_osd_id(whostr.c_str(), &ss); | |
886 | if (osd < 0) { | |
887 | ss << "invalid osd '" << whostr << "'"; | |
888 | cmdctx->reply(-EINVAL, ss); | |
889 | return true; | |
890 | } | |
891 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
892 | if (osdmap.is_up(osd)) { | |
893 | osds.insert(osd); | |
894 | } | |
895 | }); | |
896 | if (osds.empty()) { | |
897 | ss << "osd." << osd << " is not up"; | |
898 | cmdctx->reply(-EAGAIN, ss); | |
899 | return true; | |
900 | } | |
901 | } | |
902 | set<int> sent_osds, failed_osds; | |
903 | for (auto osd : osds) { | |
904 | auto p = osd_cons.find(osd); | |
905 | if (p == osd_cons.end()) { | |
906 | failed_osds.insert(osd); | |
907 | } else { | |
908 | sent_osds.insert(osd); | |
909 | for (auto& con : p->second) { | |
910 | con->send_message(new MOSDScrub(monc->get_fsid(), | |
911 | pvec.back() == "repair", | |
912 | pvec.back() == "deep-scrub")); | |
913 | } | |
914 | } | |
915 | } | |
916 | if (failed_osds.size() == osds.size()) { | |
917 | ss << "failed to instruct osd(s) " << osds << " to " << pvec.back() | |
918 | << " (not connected)"; | |
919 | r = -EAGAIN; | |
920 | } else { | |
921 | ss << "instructed osd(s) " << sent_osds << " to " << pvec.back(); | |
922 | if (!failed_osds.empty()) { | |
923 | ss << "; osd(s) " << failed_osds << " were not connected"; | |
924 | } | |
925 | r = 0; | |
926 | } | |
7c673cae FG |
927 | cmdctx->reply(0, ss); |
928 | return true; | |
929 | } else if (prefix == "osd reweight-by-pg" || | |
930 | prefix == "osd reweight-by-utilization" || | |
931 | prefix == "osd test-reweight-by-pg" || | |
932 | prefix == "osd test-reweight-by-utilization") { | |
933 | bool by_pg = | |
934 | prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg"; | |
935 | bool dry_run = | |
936 | prefix == "osd test-reweight-by-pg" || | |
937 | prefix == "osd test-reweight-by-utilization"; | |
938 | int64_t oload; | |
939 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120)); | |
940 | set<int64_t> pools; | |
941 | vector<string> poolnames; | |
942 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames); | |
943 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
944 | for (const auto& poolname : poolnames) { | |
945 | int64_t pool = osdmap.lookup_pg_pool_name(poolname); | |
946 | if (pool < 0) { | |
947 | ss << "pool '" << poolname << "' does not exist"; | |
948 | r = -ENOENT; | |
949 | } | |
950 | pools.insert(pool); | |
951 | } | |
952 | }); | |
953 | if (r) { | |
954 | cmdctx->reply(r, ss); | |
955 | return true; | |
956 | } | |
957 | double max_change = g_conf->mon_reweight_max_change; | |
958 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change); | |
959 | if (max_change <= 0.0) { | |
960 | ss << "max_change " << max_change << " must be positive"; | |
961 | cmdctx->reply(-EINVAL, ss); | |
962 | return true; | |
963 | } | |
964 | int64_t max_osds = g_conf->mon_reweight_max_osds; | |
965 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds); | |
966 | if (max_osds <= 0) { | |
967 | ss << "max_osds " << max_osds << " must be positive"; | |
968 | cmdctx->reply(-EINVAL, ss); | |
969 | return true; | |
970 | } | |
971 | string no_increasing; | |
972 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing); | |
973 | string out_str; | |
974 | mempool::osdmap::map<int32_t, uint32_t> new_weights; | |
975 | r = cluster_state.with_pgmap([&](const PGMap& pgmap) { | |
976 | return cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
977 | return reweight::by_utilization(osdmap, pgmap, | |
978 | oload, | |
979 | max_change, | |
980 | max_osds, | |
981 | by_pg, | |
982 | pools.empty() ? NULL : &pools, | |
983 | no_increasing == "--no-increasing", | |
984 | &new_weights, | |
985 | &ss, &out_str, f.get()); | |
986 | }); | |
987 | }); | |
988 | if (r >= 0) { | |
989 | dout(10) << "reweight::by_utilization: finished with " << out_str << dendl; | |
990 | } | |
991 | if (f) { | |
992 | f->flush(cmdctx->odata); | |
993 | } else { | |
994 | cmdctx->odata.append(out_str); | |
995 | } | |
996 | if (r < 0) { | |
997 | ss << "FAILED reweight-by-pg"; | |
998 | cmdctx->reply(r, ss); | |
999 | return true; | |
1000 | } else if (r == 0 || dry_run) { | |
1001 | ss << "no change"; | |
1002 | cmdctx->reply(r, ss); | |
1003 | return true; | |
1004 | } else { | |
1005 | json_spirit::Object json_object; | |
1006 | for (const auto& osd_weight : new_weights) { | |
1007 | json_spirit::Config::add(json_object, | |
1008 | std::to_string(osd_weight.first), | |
1009 | std::to_string(osd_weight.second)); | |
1010 | } | |
1011 | string s = json_spirit::write(json_object); | |
1012 | std::replace(begin(s), end(s), '\"', '\''); | |
1013 | const string cmd = | |
1014 | "{" | |
1015 | "\"prefix\": \"osd reweightn\", " | |
1016 | "\"weights\": \"" + s + "\"" | |
1017 | "}"; | |
1018 | auto on_finish = new ReplyOnFinish(cmdctx); | |
1019 | monc->start_mon_command({cmd}, {}, | |
1020 | &on_finish->from_mon, &on_finish->outs, on_finish); | |
1021 | return true; | |
1022 | } | |
31f18b77 FG |
1023 | } else if (prefix == "osd df") { |
1024 | string method; | |
1025 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method); | |
1026 | r = cluster_state.with_pgservice([&](const PGMapStatService& pgservice) { | |
1027 | return cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
1028 | print_osd_utilization(osdmap, &pgservice, ss, | |
1029 | f.get(), method == "tree"); | |
1030 | ||
1031 | cmdctx->odata.append(ss); | |
1032 | return 0; | |
1033 | }); | |
1034 | }); | |
1035 | cmdctx->reply(r, ""); | |
1036 | return true; | |
35e4c445 FG |
1037 | } else if (prefix == "osd safe-to-destroy") { |
1038 | vector<string> ids; | |
1039 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids); | |
1040 | set<int> osds; | |
1041 | int r; | |
1042 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
1043 | r = osdmap.parse_osd_id_list(ids, &osds, &ss); | |
1044 | }); | |
1045 | if (!r && osds.empty()) { | |
1046 | ss << "must specify one or more OSDs"; | |
1047 | r = -EINVAL; | |
1048 | } | |
1049 | if (r < 0) { | |
1050 | cmdctx->reply(r, ss); | |
1051 | return true; | |
1052 | } | |
1053 | set<int> active_osds, missing_stats, stored_pgs; | |
1054 | int affected_pgs = 0; | |
1055 | cluster_state.with_pgmap([&](const PGMap& pg_map) { | |
1056 | if (pg_map.num_pg_unknown > 0) { | |
1057 | ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw" | |
1058 | << " any conclusions"; | |
1059 | r = -EAGAIN; | |
1060 | return; | |
1061 | } | |
1062 | int num_active_clean = 0; | |
1063 | for (auto& p : pg_map.num_pg_by_state) { | |
1064 | unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN; | |
1065 | if ((p.first & want) == want) { | |
1066 | num_active_clean += p.second; | |
1067 | } | |
1068 | } | |
1069 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
1070 | for (auto osd : osds) { | |
1071 | if (!osdmap.exists(osd)) { | |
1072 | continue; // clearly safe to destroy | |
1073 | } | |
1074 | auto q = pg_map.num_pg_by_osd.find(osd); | |
1075 | if (q != pg_map.num_pg_by_osd.end()) { | |
1076 | if (q->second.acting > 0 || q->second.up > 0) { | |
1077 | active_osds.insert(osd); | |
1078 | affected_pgs += q->second.acting + q->second.up; | |
1079 | continue; | |
1080 | } | |
1081 | } | |
1082 | if (num_active_clean < pg_map.num_pg) { | |
1083 | // all pgs aren't active+clean; we need to be careful. | |
1084 | auto p = pg_map.osd_stat.find(osd); | |
1085 | if (p == pg_map.osd_stat.end()) { | |
1086 | missing_stats.insert(osd); | |
1087 | } | |
1088 | if (p->second.num_pgs > 0) { | |
1089 | stored_pgs.insert(osd); | |
1090 | } | |
1091 | } | |
1092 | } | |
1093 | }); | |
1094 | }); | |
1095 | if (!r && !active_osds.empty()) { | |
1096 | ss << "OSD(s) " << active_osds << " have " << affected_pgs | |
1097 | << " pgs currently mapped to them"; | |
1098 | r = -EBUSY; | |
1099 | } else if (!missing_stats.empty()) { | |
1100 | ss << "OSD(s) " << missing_stats << " have no reported stats, and not all" | |
1101 | << " PGs are active+clean; we cannot draw any conclusions"; | |
1102 | r = -EAGAIN; | |
1103 | } else if (!stored_pgs.empty()) { | |
1104 | ss << "OSD(s) " << stored_pgs << " last reported they still store some PG" | |
1105 | << " data, and not all PGs are active+clean; we cannot be sure they" | |
1106 | << " aren't still needed."; | |
1107 | r = -EBUSY; | |
1108 | } | |
1109 | if (r) { | |
1110 | cmdctx->reply(r, ss); | |
1111 | return true; | |
1112 | } | |
1113 | ss << "OSD(s) " << osds << " are safe to destroy without reducing data" | |
1114 | << " durability."; | |
1115 | cmdctx->reply(0, ss); | |
1116 | return true; | |
1117 | } else if (prefix == "osd ok-to-stop") { | |
1118 | vector<string> ids; | |
1119 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids); | |
1120 | set<int> osds; | |
1121 | int r; | |
1122 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
1123 | r = osdmap.parse_osd_id_list(ids, &osds, &ss); | |
1124 | }); | |
1125 | if (!r && osds.empty()) { | |
1126 | ss << "must specify one or more OSDs"; | |
1127 | r = -EINVAL; | |
1128 | } | |
1129 | if (r < 0) { | |
1130 | cmdctx->reply(r, ss); | |
1131 | return true; | |
1132 | } | |
1133 | map<pg_t,int> pg_delta; // pgid -> net acting set size change | |
1134 | int dangerous_pgs = 0; | |
1135 | cluster_state.with_pgmap([&](const PGMap& pg_map) { | |
1136 | return cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
1137 | if (pg_map.num_pg_unknown > 0) { | |
1138 | ss << pg_map.num_pg_unknown << " pgs have unknown state; " | |
1139 | << "cannot draw any conclusions"; | |
1140 | r = -EAGAIN; | |
1141 | return; | |
1142 | } | |
1143 | for (auto osd : osds) { | |
1144 | auto p = pg_map.pg_by_osd.find(osd); | |
1145 | if (p != pg_map.pg_by_osd.end()) { | |
1146 | for (auto& pgid : p->second) { | |
1147 | --pg_delta[pgid]; | |
1148 | } | |
1149 | } | |
1150 | } | |
1151 | for (auto& p : pg_delta) { | |
1152 | auto q = pg_map.pg_stat.find(p.first); | |
1153 | if (q == pg_map.pg_stat.end()) { | |
1154 | ss << "missing information about " << p.first << "; cannot draw" | |
1155 | << " any conclusions"; | |
1156 | r = -EAGAIN; | |
1157 | return; | |
1158 | } | |
1159 | if (!(q->second.state & PG_STATE_ACTIVE) || | |
1160 | (q->second.state & PG_STATE_DEGRADED)) { | |
1161 | // we don't currently have a good way to tell *how* degraded | |
1162 | // a degraded PG is, so we have to assume we cannot remove | |
1163 | // any more replicas/shards. | |
1164 | ++dangerous_pgs; | |
1165 | continue; | |
1166 | } | |
1167 | const pg_pool_t *pi = osdmap.get_pg_pool(p.first.pool()); | |
1168 | if (!pi) { | |
1169 | ++dangerous_pgs; // pool is creating or deleting | |
1170 | } else { | |
1171 | if (q->second.acting.size() + p.second < pi->min_size) { | |
1172 | ++dangerous_pgs; | |
1173 | } | |
1174 | } | |
1175 | } | |
1176 | }); | |
1177 | }); | |
1178 | if (r) { | |
1179 | cmdctx->reply(r, ss); | |
1180 | return true; | |
1181 | } | |
1182 | if (dangerous_pgs) { | |
1183 | ss << dangerous_pgs << " PGs are already degraded or might become " | |
1184 | << "unavailable"; | |
1185 | cmdctx->reply(-EBUSY, ss); | |
1186 | return true; | |
1187 | } | |
1188 | ss << "OSD(s) " << osds << " are ok to stop without reducing" | |
1189 | << " availability, provided there are no other concurrent failures" | |
1190 | << " or interventions. " << pg_delta.size() << " PGs are likely to be" | |
1191 | << " degraded (but remain available) as a result."; | |
1192 | cmdctx->reply(0, ss); | |
1193 | return true; | |
c07f9fc5 FG |
1194 | } else if (prefix == "pg force-recovery" || |
1195 | prefix == "pg force-backfill" || | |
1196 | prefix == "pg cancel-force-recovery" || | |
1197 | prefix == "pg cancel-force-backfill") { | |
1198 | string forceop = prefix.substr(3, string::npos); | |
1199 | list<pg_t> parsed_pgs; | |
1200 | map<int, list<pg_t> > osdpgs; | |
1201 | ||
1202 | // figure out actual op just once | |
1203 | int actual_op = 0; | |
1204 | if (forceop == "force-recovery") { | |
1205 | actual_op = OFR_RECOVERY; | |
1206 | } else if (forceop == "force-backfill") { | |
1207 | actual_op = OFR_BACKFILL; | |
1208 | } else if (forceop == "cancel-force-backfill") { | |
1209 | actual_op = OFR_BACKFILL | OFR_CANCEL; | |
1210 | } else if (forceop == "cancel-force-recovery") { | |
1211 | actual_op = OFR_RECOVERY | OFR_CANCEL; | |
1212 | } | |
1213 | ||
1214 | // covnert pg names to pgs, discard any invalid ones while at it | |
1215 | { | |
1216 | // we don't want to keep pgidstr and pgidstr_nodup forever | |
1217 | vector<string> pgidstr; | |
1218 | // get pgids to process and prune duplicates | |
1219 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr); | |
1220 | set<string> pgidstr_nodup(pgidstr.begin(), pgidstr.end()); | |
1221 | if (pgidstr.size() != pgidstr_nodup.size()) { | |
1222 | // move elements only when there were duplicates, as this | |
1223 | // reorders them | |
1224 | pgidstr.resize(pgidstr_nodup.size()); | |
1225 | auto it = pgidstr_nodup.begin(); | |
1226 | for (size_t i = 0 ; i < pgidstr_nodup.size(); i++) { | |
1227 | pgidstr[i] = std::move(*it++); | |
1228 | } | |
1229 | } | |
1230 | ||
1231 | cluster_state.with_pgmap([&](const PGMap& pg_map) { | |
1232 | for (auto& pstr : pgidstr) { | |
1233 | pg_t parsed_pg; | |
1234 | if (!parsed_pg.parse(pstr.c_str())) { | |
1235 | ss << "invalid pgid '" << pstr << "'; "; | |
1236 | r = -EINVAL; | |
1237 | } else { | |
1238 | auto workit = pg_map.pg_stat.find(parsed_pg); | |
1239 | if (workit == pg_map.pg_stat.end()) { | |
b32b8144 | 1240 | ss << "pg " << pstr << " does not exist; "; |
c07f9fc5 FG |
1241 | r = -ENOENT; |
1242 | } else { | |
1243 | pg_stat_t workpg = workit->second; | |
1244 | ||
1245 | // discard pgs for which user requests are pointless | |
1246 | switch (actual_op) | |
1247 | { | |
1248 | case OFR_RECOVERY: | |
1249 | if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING)) == 0) { | |
1250 | // don't return error, user script may be racing with cluster. not fatal. | |
1251 | ss << "pg " << pstr << " doesn't require recovery; "; | |
1252 | continue; | |
1253 | } else if (workpg.state & PG_STATE_FORCED_RECOVERY) { | |
1254 | ss << "pg " << pstr << " recovery already forced; "; | |
1255 | // return error, as it may be a bug in user script | |
1256 | r = -EINVAL; | |
1257 | continue; | |
1258 | } | |
1259 | break; | |
1260 | case OFR_BACKFILL: | |
3efd9988 | 1261 | if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING)) == 0) { |
c07f9fc5 FG |
1262 | ss << "pg " << pstr << " doesn't require backfilling; "; |
1263 | continue; | |
1264 | } else if (workpg.state & PG_STATE_FORCED_BACKFILL) { | |
1265 | ss << "pg " << pstr << " backfill already forced; "; | |
1266 | r = -EINVAL; | |
1267 | continue; | |
1268 | } | |
1269 | break; | |
1270 | case OFR_BACKFILL | OFR_CANCEL: | |
1271 | if ((workpg.state & PG_STATE_FORCED_BACKFILL) == 0) { | |
1272 | ss << "pg " << pstr << " backfill not forced; "; | |
1273 | continue; | |
1274 | } | |
1275 | break; | |
1276 | case OFR_RECOVERY | OFR_CANCEL: | |
1277 | if ((workpg.state & PG_STATE_FORCED_RECOVERY) == 0) { | |
1278 | ss << "pg " << pstr << " recovery not forced; "; | |
1279 | continue; | |
1280 | } | |
1281 | break; | |
1282 | default: | |
1283 | assert(0 == "actual_op value is not supported"); | |
1284 | } | |
1285 | ||
1286 | parsed_pgs.push_back(std::move(parsed_pg)); | |
1287 | } | |
1288 | } | |
1289 | } | |
1290 | ||
1291 | // group pgs to process by osd | |
1292 | for (auto& pgid : parsed_pgs) { | |
1293 | auto workit = pg_map.pg_stat.find(pgid); | |
1294 | if (workit != pg_map.pg_stat.end()) { | |
1295 | pg_stat_t workpg = workit->second; | |
1296 | set<int32_t> osds(workpg.up.begin(), workpg.up.end()); | |
1297 | osds.insert(workpg.acting.begin(), workpg.acting.end()); | |
1298 | for (auto i : osds) { | |
1299 | osdpgs[i].push_back(pgid); | |
1300 | } | |
1301 | } | |
1302 | } | |
1303 | ||
1304 | }); | |
1305 | } | |
1306 | ||
1307 | // respond with error only when no pgs are correct | |
1308 | // yes, in case of mixed errors, only the last one will be emitted, | |
1309 | // but the message presented will be fine | |
1310 | if (parsed_pgs.size() != 0) { | |
1311 | // clear error to not confuse users/scripts | |
1312 | r = 0; | |
1313 | } | |
1314 | ||
1315 | // optimize the command -> messages conversion, use only one message per distinct OSD | |
1316 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
1317 | for (auto& i : osdpgs) { | |
1318 | if (osdmap.is_up(i.first)) { | |
1319 | vector<pg_t> pgvec(make_move_iterator(i.second.begin()), make_move_iterator(i.second.end())); | |
1320 | auto p = osd_cons.find(i.first); | |
1321 | if (p == osd_cons.end()) { | |
1322 | ss << "osd." << i.first << " is not currently connected"; | |
1323 | r = -EAGAIN; | |
1324 | continue; | |
1325 | } | |
1326 | for (auto& con : p->second) { | |
1327 | con->send_message(new MOSDForceRecovery(monc->get_fsid(), pgvec, actual_op)); | |
1328 | } | |
1329 | ss << "instructing pg(s) " << i.second << " on osd." << i.first << " to " << forceop << "; "; | |
1330 | } | |
1331 | } | |
1332 | }); | |
1333 | ss << std::endl; | |
1334 | cmdctx->reply(r, ss); | |
1335 | return true; | |
7c673cae FG |
1336 | } else { |
1337 | r = cluster_state.with_pgmap([&](const PGMap& pg_map) { | |
1338 | return cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
1339 | return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap, | |
1340 | f.get(), &ss, &cmdctx->odata); | |
1341 | }); | |
1342 | }); | |
1343 | ||
1344 | if (r != -EOPNOTSUPP) { | |
1345 | cmdctx->reply(r, ss); | |
1346 | return true; | |
1347 | } | |
1348 | } | |
1349 | ||
1350 | // None of the special native commands, | |
3efd9988 | 1351 | ActivePyModule *handler = nullptr; |
c07f9fc5 | 1352 | auto py_commands = py_modules.get_py_commands(); |
7c673cae FG |
1353 | for (const auto &pyc : py_commands) { |
1354 | auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring); | |
1355 | dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl; | |
1356 | if (pyc_prefix == prefix) { | |
1357 | handler = pyc.handler; | |
1358 | break; | |
1359 | } | |
1360 | } | |
1361 | ||
1362 | if (handler == nullptr) { | |
1363 | ss << "No handler found for '" << prefix << "'"; | |
1364 | dout(4) << "No handler found for '" << prefix << "'" << dendl; | |
1365 | cmdctx->reply(-EINVAL, ss); | |
1366 | return true; | |
1367 | } else { | |
1368 | // Okay, now we have a handler to call, but we must not call it | |
1369 | // in this thread, because the python handlers can do anything, | |
1370 | // including blocking, and including calling back into mgr. | |
1371 | dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl; | |
1372 | finisher.queue(new FunctionContext([cmdctx, handler](int r_) { | |
1373 | std::stringstream ds; | |
1374 | std::stringstream ss; | |
1375 | int r = handler->handle_command(cmdctx->cmdmap, &ds, &ss); | |
1376 | cmdctx->odata.append(ds); | |
1377 | cmdctx->reply(r, ss); | |
1378 | })); | |
1379 | return true; | |
1380 | } | |
1381 | } | |
31f18b77 | 1382 | |
224ce89b WB |
1383 | void DaemonServer::_prune_pending_service_map() |
1384 | { | |
1385 | utime_t cutoff = ceph_clock_now(); | |
3efd9988 | 1386 | cutoff -= g_conf->get_val<double>("mgr_service_beacon_grace"); |
224ce89b WB |
1387 | auto p = pending_service_map.services.begin(); |
1388 | while (p != pending_service_map.services.end()) { | |
1389 | auto q = p->second.daemons.begin(); | |
1390 | while (q != p->second.daemons.end()) { | |
1391 | DaemonKey key(p->first, q->first); | |
1392 | if (!daemon_state.exists(key)) { | |
1393 | derr << "missing key " << key << dendl; | |
1394 | ++q; | |
1395 | continue; | |
1396 | } | |
1397 | auto daemon = daemon_state.get(key); | |
c07f9fc5 | 1398 | Mutex::Locker l(daemon->lock); |
224ce89b WB |
1399 | if (daemon->last_service_beacon == utime_t()) { |
1400 | // we must have just restarted; assume they are alive now. | |
1401 | daemon->last_service_beacon = ceph_clock_now(); | |
1402 | ++q; | |
1403 | continue; | |
1404 | } | |
1405 | if (daemon->last_service_beacon < cutoff) { | |
1406 | dout(10) << "pruning stale " << p->first << "." << q->first | |
1407 | << " last_beacon " << daemon->last_service_beacon << dendl; | |
1408 | q = p->second.daemons.erase(q); | |
1409 | pending_service_map_dirty = pending_service_map.epoch; | |
1410 | } else { | |
1411 | ++q; | |
1412 | } | |
1413 | } | |
1414 | if (p->second.daemons.empty()) { | |
1415 | p = pending_service_map.services.erase(p); | |
1416 | pending_service_map_dirty = pending_service_map.epoch; | |
1417 | } else { | |
1418 | ++p; | |
1419 | } | |
1420 | } | |
1421 | } | |
1422 | ||
31f18b77 FG |
1423 | void DaemonServer::send_report() |
1424 | { | |
224ce89b | 1425 | if (!pgmap_ready) { |
3efd9988 | 1426 | if (ceph_clock_now() - started_at > g_conf->get_val<int64_t>("mgr_stats_period") * 4.0) { |
224ce89b WB |
1427 | pgmap_ready = true; |
1428 | reported_osds.clear(); | |
1429 | dout(1) << "Giving up on OSDs that haven't reported yet, sending " | |
1430 | << "potentially incomplete PG state to mon" << dendl; | |
1431 | } else { | |
1432 | dout(1) << "Not sending PG status to monitor yet, waiting for OSDs" | |
1433 | << dendl; | |
1434 | return; | |
1435 | } | |
1436 | } | |
1437 | ||
31f18b77 | 1438 | auto m = new MMonMgrReport(); |
c07f9fc5 FG |
1439 | py_modules.get_health_checks(&m->health_checks); |
1440 | ||
31f18b77 FG |
1441 | cluster_state.with_pgmap([&](const PGMap& pg_map) { |
1442 | cluster_state.update_delta_stats(); | |
1443 | ||
224ce89b WB |
1444 | if (pending_service_map.epoch) { |
1445 | _prune_pending_service_map(); | |
1446 | if (pending_service_map_dirty >= pending_service_map.epoch) { | |
1447 | pending_service_map.modified = ceph_clock_now(); | |
1448 | ::encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL); | |
1449 | dout(10) << "sending service_map e" << pending_service_map.epoch | |
1450 | << dendl; | |
1451 | pending_service_map.epoch++; | |
1452 | } | |
1453 | } | |
1454 | ||
31f18b77 FG |
1455 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { |
1456 | // FIXME: no easy way to get mon features here. this will do for | |
1457 | // now, though, as long as we don't make a backward-incompat change. | |
1458 | pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL); | |
1459 | dout(10) << pg_map << dendl; | |
224ce89b WB |
1460 | |
1461 | pg_map.get_health_checks(g_ceph_context, osdmap, | |
1462 | &m->health_checks); | |
c07f9fc5 | 1463 | |
224ce89b WB |
1464 | dout(10) << m->health_checks.checks.size() << " health checks" |
1465 | << dendl; | |
1466 | dout(20) << "health checks:\n"; | |
1467 | JSONFormatter jf(true); | |
1468 | jf.dump_object("health_checks", m->health_checks); | |
1469 | jf.flush(*_dout); | |
1470 | *_dout << dendl; | |
31f18b77 FG |
1471 | }); |
1472 | }); | |
b32b8144 FG |
1473 | |
1474 | auto osds = daemon_state.get_by_service("osd"); | |
1475 | map<osd_metric, unique_ptr<OSDHealthMetricCollector>> accumulated; | |
1476 | for (const auto& osd : osds) { | |
1477 | Mutex::Locker l(osd.second->lock); | |
1478 | for (const auto& metric : osd.second->osd_health_metrics) { | |
1479 | auto acc = accumulated.find(metric.get_type()); | |
1480 | if (acc == accumulated.end()) { | |
1481 | auto collector = OSDHealthMetricCollector::create(metric.get_type()); | |
1482 | if (!collector) { | |
1483 | derr << __func__ << " " << osd.first << "." << osd.second | |
1484 | << " sent me an unknown health metric: " | |
1485 | << static_cast<uint8_t>(metric.get_type()) << dendl; | |
1486 | continue; | |
1487 | } | |
1488 | tie(acc, std::ignore) = accumulated.emplace(metric.get_type(), | |
1489 | std::move(collector)); | |
1490 | } | |
1491 | acc->second->update(osd.first, metric); | |
1492 | } | |
1493 | } | |
1494 | for (const auto& acc : accumulated) { | |
1495 | acc.second->summarize(m->health_checks); | |
1496 | } | |
31f18b77 FG |
1497 | // TODO? We currently do not notify the PyModules |
1498 | // TODO: respect needs_send, so we send the report only if we are asked to do | |
1499 | // so, or the state is updated. | |
1500 | monc->send_mon_message(m); | |
1501 | } | |
224ce89b WB |
1502 | |
1503 | void DaemonServer::got_service_map() | |
1504 | { | |
1505 | Mutex::Locker l(lock); | |
1506 | ||
1507 | cluster_state.with_servicemap([&](const ServiceMap& service_map) { | |
1508 | if (pending_service_map.epoch == 0) { | |
1509 | // we just started up | |
1510 | dout(10) << "got initial map e" << service_map.epoch << dendl; | |
1511 | pending_service_map = service_map; | |
1512 | } else { | |
1513 | // we we already active and therefore must have persisted it, | |
1514 | // which means ours is the same or newer. | |
1515 | dout(10) << "got updated map e" << service_map.epoch << dendl; | |
1516 | } | |
1517 | pending_service_map.epoch = service_map.epoch + 1; | |
1518 | }); | |
1519 | ||
1520 | // cull missing daemons, populate new ones | |
1521 | for (auto& p : pending_service_map.services) { | |
1522 | std::set<std::string> names; | |
1523 | for (auto& q : p.second.daemons) { | |
1524 | names.insert(q.first); | |
1525 | DaemonKey key(p.first, q.first); | |
1526 | if (!daemon_state.exists(key)) { | |
1527 | auto daemon = std::make_shared<DaemonState>(daemon_state.types); | |
1528 | daemon->key = key; | |
1529 | daemon->metadata = q.second.metadata; | |
1530 | if (q.second.metadata.count("hostname")) { | |
1531 | daemon->hostname = q.second.metadata["hostname"]; | |
1532 | } | |
1533 | daemon->service_daemon = true; | |
1534 | daemon_state.insert(daemon); | |
1535 | dout(10) << "added missing " << key << dendl; | |
1536 | } | |
1537 | } | |
1538 | daemon_state.cull(p.first, names); | |
1539 | } | |
1540 | } | |
3efd9988 FG |
1541 | |
1542 | ||
1543 | const char** DaemonServer::get_tracked_conf_keys() const | |
1544 | { | |
1545 | static const char *KEYS[] = { | |
1546 | "mgr_stats_threshold", | |
1547 | "mgr_stats_period", | |
1548 | nullptr | |
1549 | }; | |
1550 | ||
1551 | return KEYS; | |
1552 | } | |
1553 | ||
1554 | void DaemonServer::handle_conf_change(const struct md_config_t *conf, | |
1555 | const std::set <std::string> &changed) | |
1556 | { | |
1557 | dout(4) << "ohai" << dendl; | |
1558 | // We may be called within lock (via MCommand `config set`) or outwith the | |
1559 | // lock (via admin socket `config set`), so handle either case. | |
1560 | const bool initially_locked = lock.is_locked_by_me(); | |
1561 | if (!initially_locked) { | |
1562 | lock.Lock(); | |
1563 | } | |
1564 | ||
1565 | if (changed.count("mgr_stats_threshold") || changed.count("mgr_stats_period")) { | |
1566 | dout(4) << "Updating stats threshold/period on " | |
1567 | << daemon_connections.size() << " clients" << dendl; | |
1568 | // Send a fresh MMgrConfigure to all clients, so that they can follow | |
1569 | // the new policy for transmitting stats | |
1570 | for (auto &c : daemon_connections) { | |
1571 | _send_configure(c); | |
1572 | } | |
1573 | } | |
1574 | } | |
1575 | ||
1576 | void DaemonServer::_send_configure(ConnectionRef c) | |
1577 | { | |
1578 | assert(lock.is_locked_by_me()); | |
1579 | ||
1580 | auto configure = new MMgrConfigure(); | |
1581 | configure->stats_period = g_conf->get_val<int64_t>("mgr_stats_period"); | |
1582 | configure->stats_threshold = g_conf->get_val<int64_t>("mgr_stats_threshold"); | |
1583 | c->send_message(configure); | |
1584 | } | |
1585 |