]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 John Spray <john.spray@redhat.com> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | */ | |
13 | ||
14 | #include "DaemonServer.h" | |
15 | ||
31f18b77 | 16 | #include "include/str_list.h" |
7c673cae | 17 | #include "auth/RotatingKeyRing.h" |
7c673cae FG |
18 | #include "json_spirit/json_spirit_writer.h" |
19 | ||
20 | #include "messages/MMgrOpen.h" | |
21 | #include "messages/MMgrConfigure.h" | |
31f18b77 | 22 | #include "messages/MMonMgrReport.h" |
7c673cae FG |
23 | #include "messages/MCommand.h" |
24 | #include "messages/MCommandReply.h" | |
25 | #include "messages/MPGStats.h" | |
26 | #include "messages/MOSDScrub.h" | |
27 | ||
28 | #define dout_context g_ceph_context | |
29 | #define dout_subsys ceph_subsys_mgr | |
30 | #undef dout_prefix | |
31 | #define dout_prefix *_dout << "mgr.server " << __func__ << " " | |
32 | ||
33 | DaemonServer::DaemonServer(MonClient *monc_, | |
34 | Finisher &finisher_, | |
35 | DaemonStateIndex &daemon_state_, | |
36 | ClusterState &cluster_state_, | |
37 | PyModules &py_modules_, | |
38 | LogChannelRef clog_, | |
39 | LogChannelRef audit_clog_) | |
40 | : Dispatcher(g_ceph_context), | |
41 | client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes", | |
42 | g_conf->mgr_client_bytes)), | |
43 | client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages", | |
44 | g_conf->mgr_client_messages)), | |
45 | osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes", | |
46 | g_conf->mgr_osd_bytes)), | |
47 | osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages", | |
48 | g_conf->mgr_osd_messages)), | |
49 | mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes", | |
50 | g_conf->mgr_mds_bytes)), | |
51 | mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages", | |
52 | g_conf->mgr_mds_messages)), | |
53 | mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes", | |
54 | g_conf->mgr_mon_bytes)), | |
55 | mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages", | |
56 | g_conf->mgr_mon_messages)), | |
57 | msgr(nullptr), | |
58 | monc(monc_), | |
59 | finisher(finisher_), | |
60 | daemon_state(daemon_state_), | |
61 | cluster_state(cluster_state_), | |
62 | py_modules(py_modules_), | |
63 | clog(clog_), | |
64 | audit_clog(audit_clog_), | |
65 | auth_registry(g_ceph_context, | |
66 | g_conf->auth_supported.empty() ? | |
67 | g_conf->auth_cluster_required : | |
68 | g_conf->auth_supported), | |
69 | lock("DaemonServer") | |
70 | {} | |
71 | ||
72 | DaemonServer::~DaemonServer() { | |
73 | delete msgr; | |
74 | } | |
75 | ||
76 | int DaemonServer::init(uint64_t gid, entity_addr_t client_addr) | |
77 | { | |
78 | // Initialize Messenger | |
79 | std::string public_msgr_type = g_conf->ms_public_type.empty() ? | |
80 | g_conf->get_val<std::string>("ms_type") : g_conf->ms_public_type; | |
81 | msgr = Messenger::create(g_ceph_context, public_msgr_type, | |
82 | entity_name_t::MGR(gid), | |
83 | "mgr", | |
84 | getpid(), 0); | |
85 | msgr->set_default_policy(Messenger::Policy::stateless_server(0)); | |
86 | ||
87 | // throttle clients | |
88 | msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT, | |
89 | client_byte_throttler.get(), | |
90 | client_msg_throttler.get()); | |
91 | ||
92 | // servers | |
93 | msgr->set_policy_throttlers(entity_name_t::TYPE_OSD, | |
94 | osd_byte_throttler.get(), | |
95 | osd_msg_throttler.get()); | |
96 | msgr->set_policy_throttlers(entity_name_t::TYPE_MDS, | |
97 | mds_byte_throttler.get(), | |
98 | mds_msg_throttler.get()); | |
99 | msgr->set_policy_throttlers(entity_name_t::TYPE_MON, | |
100 | mon_byte_throttler.get(), | |
101 | mon_msg_throttler.get()); | |
102 | ||
103 | int r = msgr->bind(g_conf->public_addr); | |
104 | if (r < 0) { | |
105 | derr << "unable to bind mgr to " << g_conf->public_addr << dendl; | |
106 | return r; | |
107 | } | |
108 | ||
109 | msgr->set_myname(entity_name_t::MGR(gid)); | |
110 | msgr->set_addr_unknowns(client_addr); | |
111 | ||
112 | msgr->start(); | |
113 | msgr->add_dispatcher_tail(this); | |
114 | ||
115 | return 0; | |
116 | } | |
117 | ||
118 | entity_addr_t DaemonServer::get_myaddr() const | |
119 | { | |
120 | return msgr->get_myaddr(); | |
121 | } | |
122 | ||
123 | ||
124 | bool DaemonServer::ms_verify_authorizer(Connection *con, | |
125 | int peer_type, | |
126 | int protocol, | |
127 | ceph::bufferlist& authorizer_data, | |
128 | ceph::bufferlist& authorizer_reply, | |
129 | bool& is_valid, | |
130 | CryptoKey& session_key) | |
131 | { | |
132 | auto handler = auth_registry.get_handler(protocol); | |
133 | if (!handler) { | |
134 | dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl; | |
135 | is_valid = false; | |
136 | return true; | |
137 | } | |
138 | ||
139 | MgrSessionRef s(new MgrSession(cct)); | |
140 | s->inst.addr = con->get_peer_addr(); | |
141 | AuthCapsInfo caps_info; | |
142 | ||
143 | is_valid = handler->verify_authorizer( | |
144 | cct, monc->rotating_secrets.get(), | |
145 | authorizer_data, | |
146 | authorizer_reply, s->entity_name, | |
147 | s->global_id, caps_info, | |
148 | session_key); | |
149 | ||
150 | if (is_valid) { | |
151 | if (caps_info.allow_all) { | |
152 | dout(10) << " session " << s << " " << s->entity_name | |
153 | << " allow_all" << dendl; | |
154 | s->caps.set_allow_all(); | |
155 | } | |
156 | if (caps_info.caps.length() > 0) { | |
157 | bufferlist::iterator p = caps_info.caps.begin(); | |
158 | string str; | |
159 | try { | |
160 | ::decode(str, p); | |
161 | } | |
162 | catch (buffer::error& e) { | |
163 | } | |
164 | bool success = s->caps.parse(str); | |
165 | if (success) { | |
166 | dout(10) << " session " << s << " " << s->entity_name | |
167 | << " has caps " << s->caps << " '" << str << "'" << dendl; | |
168 | } else { | |
169 | dout(10) << " session " << s << " " << s->entity_name | |
170 | << " failed to parse caps '" << str << "'" << dendl; | |
171 | is_valid = false; | |
172 | } | |
173 | } | |
174 | con->set_priv(s->get()); | |
31f18b77 FG |
175 | |
176 | if (peer_type == CEPH_ENTITY_TYPE_OSD) { | |
177 | Mutex::Locker l(lock); | |
178 | s->osd_id = atoi(s->entity_name.get_id().c_str()); | |
179 | dout(10) << __func__ << " registering osd." << s->osd_id << " session " | |
180 | << s << " con " << con << dendl; | |
181 | osd_cons[s->osd_id].insert(con); | |
182 | } | |
7c673cae FG |
183 | } |
184 | ||
185 | return true; | |
186 | } | |
187 | ||
188 | ||
189 | bool DaemonServer::ms_get_authorizer(int dest_type, | |
190 | AuthAuthorizer **authorizer, bool force_new) | |
191 | { | |
192 | dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl; | |
193 | ||
194 | if (dest_type == CEPH_ENTITY_TYPE_MON) { | |
195 | return true; | |
196 | } | |
197 | ||
198 | if (force_new) { | |
199 | if (monc->wait_auth_rotating(10) < 0) | |
200 | return false; | |
201 | } | |
202 | ||
203 | *authorizer = monc->build_authorizer(dest_type); | |
204 | dout(20) << "got authorizer " << *authorizer << dendl; | |
205 | return *authorizer != NULL; | |
206 | } | |
207 | ||
31f18b77 FG |
208 | bool DaemonServer::ms_handle_reset(Connection *con) |
209 | { | |
210 | if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) { | |
211 | MgrSessionRef session(static_cast<MgrSession*>(con->get_priv())); | |
212 | if (!session) { | |
213 | return false; | |
214 | } | |
215 | session->put(); // SessionRef takes a ref | |
216 | Mutex::Locker l(lock); | |
217 | dout(10) << __func__ << " unregistering osd." << session->osd_id | |
218 | << " session " << session << " con " << con << dendl; | |
219 | osd_cons[session->osd_id].erase(con); | |
220 | } | |
221 | return false; | |
222 | } | |
223 | ||
7c673cae FG |
224 | bool DaemonServer::ms_handle_refused(Connection *con) |
225 | { | |
226 | // do nothing for now | |
227 | return false; | |
228 | } | |
229 | ||
230 | bool DaemonServer::ms_dispatch(Message *m) | |
231 | { | |
232 | Mutex::Locker l(lock); | |
233 | ||
234 | switch (m->get_type()) { | |
235 | case MSG_PGSTATS: | |
236 | cluster_state.ingest_pgstats(static_cast<MPGStats*>(m)); | |
237 | m->put(); | |
238 | return true; | |
239 | case MSG_MGR_REPORT: | |
240 | return handle_report(static_cast<MMgrReport*>(m)); | |
241 | case MSG_MGR_OPEN: | |
242 | return handle_open(static_cast<MMgrOpen*>(m)); | |
243 | case MSG_COMMAND: | |
244 | return handle_command(static_cast<MCommand*>(m)); | |
245 | default: | |
246 | dout(1) << "Unhandled message type " << m->get_type() << dendl; | |
247 | return false; | |
248 | }; | |
249 | } | |
250 | ||
251 | void DaemonServer::shutdown() | |
252 | { | |
253 | dout(10) << __func__ << dendl; | |
254 | msgr->shutdown(); | |
255 | msgr->wait(); | |
256 | dout(10) << __func__ << " done" << dendl; | |
257 | } | |
258 | ||
259 | ||
260 | ||
261 | bool DaemonServer::handle_open(MMgrOpen *m) | |
262 | { | |
31f18b77 FG |
263 | uint32_t type = m->get_connection()->get_peer_type(); |
264 | DaemonKey key(type, m->daemon_name); | |
7c673cae FG |
265 | |
266 | dout(4) << "from " << m->get_connection() << " name " | |
31f18b77 | 267 | << ceph_entity_type_name(type) << "." << m->daemon_name << dendl; |
7c673cae FG |
268 | |
269 | auto configure = new MMgrConfigure(); | |
31f18b77 FG |
270 | if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT) { |
271 | // We don't want clients to send us stats | |
272 | configure->stats_period = 0; | |
273 | } else { | |
274 | configure->stats_period = g_conf->mgr_stats_period; | |
275 | } | |
7c673cae FG |
276 | m->get_connection()->send_message(configure); |
277 | ||
278 | if (daemon_state.exists(key)) { | |
279 | dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl; | |
280 | daemon_state.get(key)->perf_counters.clear(); | |
281 | } | |
282 | ||
283 | m->put(); | |
284 | return true; | |
285 | } | |
286 | ||
287 | bool DaemonServer::handle_report(MMgrReport *m) | |
288 | { | |
31f18b77 FG |
289 | uint32_t type = m->get_connection()->get_peer_type(); |
290 | DaemonKey key(type, m->daemon_name); | |
7c673cae FG |
291 | |
292 | dout(4) << "from " << m->get_connection() << " name " | |
31f18b77 FG |
293 | << ceph_entity_type_name(type) << "." << m->daemon_name << dendl; |
294 | ||
295 | if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT) { | |
296 | // Clients should not be sending us stats | |
297 | dout(4) << "rejecting report from client " << m->daemon_name << dendl; | |
298 | m->put(); | |
299 | return true; | |
300 | } | |
7c673cae FG |
301 | |
302 | DaemonStatePtr daemon; | |
303 | if (daemon_state.exists(key)) { | |
304 | dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl; | |
305 | daemon = daemon_state.get(key); | |
306 | } else { | |
307 | dout(4) << "constructing new DaemonState for " << m->daemon_name << dendl; | |
308 | daemon = std::make_shared<DaemonState>(daemon_state.types); | |
309 | // FIXME: crap, we don't know the hostname at this stage. | |
310 | daemon->key = key; | |
311 | daemon_state.insert(daemon); | |
312 | // FIXME: we should request metadata at this stage | |
313 | } | |
314 | ||
315 | assert(daemon != nullptr); | |
316 | auto &daemon_counters = daemon->perf_counters; | |
317 | daemon_counters.update(m); | |
318 | ||
319 | m->put(); | |
320 | return true; | |
321 | } | |
322 | ||
323 | struct MgrCommand { | |
324 | string cmdstring; | |
325 | string helpstring; | |
326 | string module; | |
327 | string perm; | |
328 | string availability; | |
329 | ||
330 | bool requires_perm(char p) const { | |
331 | return (perm.find(p) != string::npos); | |
332 | } | |
333 | ||
334 | } mgr_commands[] = { | |
335 | ||
336 | #define COMMAND(parsesig, helptext, module, perm, availability) \ | |
337 | {parsesig, helptext, module, perm, availability}, | |
338 | #include "MgrCommands.h" | |
339 | #undef COMMAND | |
340 | }; | |
341 | ||
342 | void DaemonServer::_generate_command_map( | |
343 | map<string,cmd_vartype>& cmdmap, | |
344 | map<string,string> ¶m_str_map) | |
345 | { | |
346 | for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin(); | |
347 | p != cmdmap.end(); ++p) { | |
348 | if (p->first == "prefix") | |
349 | continue; | |
350 | if (p->first == "caps") { | |
351 | vector<string> cv; | |
352 | if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) && | |
353 | cv.size() % 2 == 0) { | |
354 | for (unsigned i = 0; i < cv.size(); i += 2) { | |
355 | string k = string("caps_") + cv[i]; | |
356 | param_str_map[k] = cv[i + 1]; | |
357 | } | |
358 | continue; | |
359 | } | |
360 | } | |
361 | param_str_map[p->first] = cmd_vartype_stringify(p->second); | |
362 | } | |
363 | } | |
364 | ||
365 | const MgrCommand *DaemonServer::_get_mgrcommand( | |
366 | const string &cmd_prefix, | |
367 | MgrCommand *cmds, | |
368 | int cmds_size) | |
369 | { | |
370 | MgrCommand *this_cmd = NULL; | |
371 | for (MgrCommand *cp = cmds; | |
372 | cp < &cmds[cmds_size]; cp++) { | |
373 | if (cp->cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) { | |
374 | this_cmd = cp; | |
375 | break; | |
376 | } | |
377 | } | |
378 | return this_cmd; | |
379 | } | |
380 | ||
381 | bool DaemonServer::_allowed_command( | |
382 | MgrSession *s, | |
383 | const string &module, | |
384 | const string &prefix, | |
385 | const map<string,cmd_vartype>& cmdmap, | |
386 | const map<string,string>& param_str_map, | |
387 | const MgrCommand *this_cmd) { | |
388 | ||
389 | if (s->entity_name.is_mon()) { | |
390 | // mon is all-powerful. even when it is forwarding commands on behalf of | |
391 | // old clients; we expect the mon is validating commands before proxying! | |
392 | return true; | |
393 | } | |
394 | ||
395 | bool cmd_r = this_cmd->requires_perm('r'); | |
396 | bool cmd_w = this_cmd->requires_perm('w'); | |
397 | bool cmd_x = this_cmd->requires_perm('x'); | |
398 | ||
399 | bool capable = s->caps.is_capable( | |
400 | g_ceph_context, | |
401 | CEPH_ENTITY_TYPE_MGR, | |
402 | s->entity_name, | |
403 | module, prefix, param_str_map, | |
404 | cmd_r, cmd_w, cmd_x); | |
405 | ||
406 | dout(10) << " " << s->entity_name << " " | |
407 | << (capable ? "" : "not ") << "capable" << dendl; | |
408 | return capable; | |
409 | } | |
410 | ||
411 | bool DaemonServer::handle_command(MCommand *m) | |
412 | { | |
413 | int r = 0; | |
414 | std::stringstream ss; | |
415 | std::string prefix; | |
416 | ||
417 | assert(lock.is_locked_by_me()); | |
418 | ||
419 | /** | |
420 | * The working data for processing an MCommand. This lives in | |
421 | * a class to enable passing it into other threads for processing | |
422 | * outside of the thread/locks that called handle_command. | |
423 | */ | |
424 | class CommandContext | |
425 | { | |
426 | public: | |
427 | MCommand *m; | |
428 | bufferlist odata; | |
429 | cmdmap_t cmdmap; | |
430 | ||
431 | CommandContext(MCommand *m_) | |
432 | : m(m_) | |
433 | { | |
434 | } | |
435 | ||
436 | ~CommandContext() | |
437 | { | |
438 | m->put(); | |
439 | } | |
440 | ||
441 | void reply(int r, const std::stringstream &ss) | |
442 | { | |
443 | reply(r, ss.str()); | |
444 | } | |
445 | ||
446 | void reply(int r, const std::string &rs) | |
447 | { | |
448 | // Let the connection drop as soon as we've sent our response | |
449 | ConnectionRef con = m->get_connection(); | |
450 | if (con) { | |
451 | con->mark_disposable(); | |
452 | } | |
453 | ||
454 | dout(1) << "do_command r=" << r << " " << rs << dendl; | |
455 | if (con) { | |
456 | MCommandReply *reply = new MCommandReply(r, rs); | |
457 | reply->set_tid(m->get_tid()); | |
458 | reply->set_data(odata); | |
459 | con->send_message(reply); | |
460 | } | |
461 | } | |
462 | }; | |
463 | ||
464 | /** | |
465 | * A context for receiving a bufferlist/error string from a background | |
466 | * function and then calling back to a CommandContext when it's done | |
467 | */ | |
468 | class ReplyOnFinish : public Context { | |
469 | std::shared_ptr<CommandContext> cmdctx; | |
470 | ||
471 | public: | |
472 | bufferlist from_mon; | |
473 | string outs; | |
474 | ||
475 | ReplyOnFinish(std::shared_ptr<CommandContext> cmdctx_) | |
476 | : cmdctx(cmdctx_) | |
477 | {} | |
478 | void finish(int r) override { | |
479 | cmdctx->odata.claim_append(from_mon); | |
480 | cmdctx->reply(r, outs); | |
481 | } | |
482 | }; | |
483 | ||
484 | std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m); | |
485 | ||
486 | MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv())); | |
487 | if (!session) { | |
488 | return true; | |
489 | } | |
490 | session->put(); // SessionRef takes a ref | |
491 | if (session->inst.name == entity_name_t()) | |
492 | session->inst.name = m->get_source(); | |
493 | ||
494 | std::string format; | |
495 | boost::scoped_ptr<Formatter> f; | |
496 | map<string,string> param_str_map; | |
497 | ||
498 | if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) { | |
499 | cmdctx->reply(-EINVAL, ss); | |
500 | return true; | |
501 | } | |
502 | ||
503 | { | |
504 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain")); | |
505 | f.reset(Formatter::create(format)); | |
506 | } | |
507 | ||
508 | cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix); | |
509 | ||
510 | dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl; | |
511 | dout(4) << "prefix=" << prefix << dendl; | |
512 | ||
513 | if (prefix == "get_command_descriptions") { | |
514 | int cmdnum = 0; | |
515 | ||
516 | dout(10) << "reading commands from python modules" << dendl; | |
517 | auto py_commands = py_modules.get_commands(); | |
518 | ||
519 | JSONFormatter f; | |
520 | f.open_object_section("command_descriptions"); | |
521 | for (const auto &pyc : py_commands) { | |
522 | ostringstream secname; | |
523 | secname << "cmd" << setfill('0') << std::setw(3) << cmdnum; | |
524 | dout(20) << "Dumping " << pyc.cmdstring << " (" << pyc.helpstring | |
525 | << ")" << dendl; | |
526 | dump_cmddesc_to_json(&f, secname.str(), pyc.cmdstring, pyc.helpstring, | |
527 | "mgr", pyc.perm, "cli", 0); | |
528 | cmdnum++; | |
529 | } | |
7c673cae | 530 | |
31f18b77 | 531 | for (const auto &cp : mgr_commands) { |
7c673cae FG |
532 | ostringstream secname; |
533 | secname << "cmd" << setfill('0') << std::setw(3) << cmdnum; | |
31f18b77 FG |
534 | dump_cmddesc_to_json(&f, secname.str(), cp.cmdstring, cp.helpstring, |
535 | cp.module, cp.perm, cp.availability, 0); | |
7c673cae FG |
536 | cmdnum++; |
537 | } | |
7c673cae FG |
538 | f.close_section(); // command_descriptions |
539 | f.flush(cmdctx->odata); | |
540 | cmdctx->reply(0, ss); | |
541 | return true; | |
542 | } | |
543 | ||
544 | // lookup command | |
545 | const MgrCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands, | |
546 | ARRAY_SIZE(mgr_commands)); | |
547 | _generate_command_map(cmdctx->cmdmap, param_str_map); | |
548 | if (!mgr_cmd) { | |
549 | MgrCommand py_command = {"", "", "py", "rw", "cli"}; | |
550 | if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap, | |
551 | param_str_map, &py_command)) { | |
552 | dout(1) << " access denied" << dendl; | |
31f18b77 FG |
553 | ss << "access denied; does your client key have mgr caps?" |
554 | " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication"; | |
7c673cae FG |
555 | cmdctx->reply(-EACCES, ss); |
556 | return true; | |
557 | } | |
558 | } else { | |
559 | // validate user's permissions for requested command | |
560 | if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdctx->cmdmap, | |
561 | param_str_map, mgr_cmd)) { | |
562 | dout(1) << " access denied" << dendl; | |
563 | audit_clog->info() << "from='" << session->inst << "' " | |
564 | << "entity='" << session->entity_name << "' " | |
565 | << "cmd=" << m->cmd << ": access denied"; | |
31f18b77 FG |
566 | ss << "access denied' does your client key have mgr caps?" |
567 | " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication"; | |
7c673cae FG |
568 | cmdctx->reply(-EACCES, ss); |
569 | return true; | |
570 | } | |
571 | } | |
572 | ||
573 | audit_clog->debug() | |
574 | << "from='" << session->inst << "' " | |
575 | << "entity='" << session->entity_name << "' " | |
576 | << "cmd=" << m->cmd << ": dispatch"; | |
577 | ||
578 | // ----------- | |
579 | // PG commands | |
580 | ||
581 | if (prefix == "pg scrub" || | |
582 | prefix == "pg repair" || | |
583 | prefix == "pg deep-scrub") { | |
584 | string scrubop = prefix.substr(3, string::npos); | |
585 | pg_t pgid; | |
586 | string pgidstr; | |
587 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr); | |
588 | if (!pgid.parse(pgidstr.c_str())) { | |
589 | ss << "invalid pgid '" << pgidstr << "'"; | |
590 | cmdctx->reply(-EINVAL, ss); | |
591 | return true; | |
592 | } | |
593 | bool pg_exists = false; | |
594 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
595 | pg_exists = osdmap.pg_exists(pgid); | |
596 | }); | |
597 | if (!pg_exists) { | |
598 | ss << "pg " << pgid << " dne"; | |
599 | cmdctx->reply(-ENOENT, ss); | |
600 | return true; | |
601 | } | |
602 | int acting_primary = -1; | |
7c673cae FG |
603 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { |
604 | acting_primary = osdmap.get_pg_acting_primary(pgid); | |
7c673cae FG |
605 | }); |
606 | if (acting_primary == -1) { | |
607 | ss << "pg " << pgid << " has no primary osd"; | |
608 | cmdctx->reply(-EAGAIN, ss); | |
609 | return true; | |
610 | } | |
31f18b77 FG |
611 | auto p = osd_cons.find(acting_primary); |
612 | if (p == osd_cons.end()) { | |
613 | ss << "pg " << pgid << " primary osd." << acting_primary | |
614 | << " is not currently connected"; | |
615 | cmdctx->reply(-EAGAIN, ss); | |
616 | } | |
7c673cae | 617 | vector<pg_t> pgs = { pgid }; |
31f18b77 FG |
618 | for (auto& con : p->second) { |
619 | con->send_message(new MOSDScrub(monc->get_fsid(), | |
620 | pgs, | |
621 | scrubop == "repair", | |
622 | scrubop == "deep-scrub")); | |
623 | } | |
7c673cae | 624 | ss << "instructing pg " << pgid << " on osd." << acting_primary |
31f18b77 FG |
625 | << " to " << scrubop; |
626 | cmdctx->reply(0, ss); | |
627 | return true; | |
628 | } else if (prefix == "osd scrub" || | |
629 | prefix == "osd deep-scrub" || | |
630 | prefix == "osd repair") { | |
631 | string whostr; | |
632 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr); | |
633 | vector<string> pvec; | |
634 | get_str_vec(prefix, pvec); | |
635 | ||
636 | set<int> osds; | |
637 | if (whostr == "*") { | |
638 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
639 | for (int i = 0; i < osdmap.get_max_osd(); i++) | |
640 | if (osdmap.is_up(i)) { | |
641 | osds.insert(i); | |
642 | } | |
643 | }); | |
644 | } else { | |
645 | long osd = parse_osd_id(whostr.c_str(), &ss); | |
646 | if (osd < 0) { | |
647 | ss << "invalid osd '" << whostr << "'"; | |
648 | cmdctx->reply(-EINVAL, ss); | |
649 | return true; | |
650 | } | |
651 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
652 | if (osdmap.is_up(osd)) { | |
653 | osds.insert(osd); | |
654 | } | |
655 | }); | |
656 | if (osds.empty()) { | |
657 | ss << "osd." << osd << " is not up"; | |
658 | cmdctx->reply(-EAGAIN, ss); | |
659 | return true; | |
660 | } | |
661 | } | |
662 | set<int> sent_osds, failed_osds; | |
663 | for (auto osd : osds) { | |
664 | auto p = osd_cons.find(osd); | |
665 | if (p == osd_cons.end()) { | |
666 | failed_osds.insert(osd); | |
667 | } else { | |
668 | sent_osds.insert(osd); | |
669 | for (auto& con : p->second) { | |
670 | con->send_message(new MOSDScrub(monc->get_fsid(), | |
671 | pvec.back() == "repair", | |
672 | pvec.back() == "deep-scrub")); | |
673 | } | |
674 | } | |
675 | } | |
676 | if (failed_osds.size() == osds.size()) { | |
677 | ss << "failed to instruct osd(s) " << osds << " to " << pvec.back() | |
678 | << " (not connected)"; | |
679 | r = -EAGAIN; | |
680 | } else { | |
681 | ss << "instructed osd(s) " << sent_osds << " to " << pvec.back(); | |
682 | if (!failed_osds.empty()) { | |
683 | ss << "; osd(s) " << failed_osds << " were not connected"; | |
684 | } | |
685 | r = 0; | |
686 | } | |
7c673cae FG |
687 | cmdctx->reply(0, ss); |
688 | return true; | |
689 | } else if (prefix == "osd reweight-by-pg" || | |
690 | prefix == "osd reweight-by-utilization" || | |
691 | prefix == "osd test-reweight-by-pg" || | |
692 | prefix == "osd test-reweight-by-utilization") { | |
693 | bool by_pg = | |
694 | prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg"; | |
695 | bool dry_run = | |
696 | prefix == "osd test-reweight-by-pg" || | |
697 | prefix == "osd test-reweight-by-utilization"; | |
698 | int64_t oload; | |
699 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120)); | |
700 | set<int64_t> pools; | |
701 | vector<string> poolnames; | |
702 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames); | |
703 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
704 | for (const auto& poolname : poolnames) { | |
705 | int64_t pool = osdmap.lookup_pg_pool_name(poolname); | |
706 | if (pool < 0) { | |
707 | ss << "pool '" << poolname << "' does not exist"; | |
708 | r = -ENOENT; | |
709 | } | |
710 | pools.insert(pool); | |
711 | } | |
712 | }); | |
713 | if (r) { | |
714 | cmdctx->reply(r, ss); | |
715 | return true; | |
716 | } | |
717 | double max_change = g_conf->mon_reweight_max_change; | |
718 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change); | |
719 | if (max_change <= 0.0) { | |
720 | ss << "max_change " << max_change << " must be positive"; | |
721 | cmdctx->reply(-EINVAL, ss); | |
722 | return true; | |
723 | } | |
724 | int64_t max_osds = g_conf->mon_reweight_max_osds; | |
725 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds); | |
726 | if (max_osds <= 0) { | |
727 | ss << "max_osds " << max_osds << " must be positive"; | |
728 | cmdctx->reply(-EINVAL, ss); | |
729 | return true; | |
730 | } | |
731 | string no_increasing; | |
732 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing); | |
733 | string out_str; | |
734 | mempool::osdmap::map<int32_t, uint32_t> new_weights; | |
735 | r = cluster_state.with_pgmap([&](const PGMap& pgmap) { | |
736 | return cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
737 | return reweight::by_utilization(osdmap, pgmap, | |
738 | oload, | |
739 | max_change, | |
740 | max_osds, | |
741 | by_pg, | |
742 | pools.empty() ? NULL : &pools, | |
743 | no_increasing == "--no-increasing", | |
744 | &new_weights, | |
745 | &ss, &out_str, f.get()); | |
746 | }); | |
747 | }); | |
748 | if (r >= 0) { | |
749 | dout(10) << "reweight::by_utilization: finished with " << out_str << dendl; | |
750 | } | |
751 | if (f) { | |
752 | f->flush(cmdctx->odata); | |
753 | } else { | |
754 | cmdctx->odata.append(out_str); | |
755 | } | |
756 | if (r < 0) { | |
757 | ss << "FAILED reweight-by-pg"; | |
758 | cmdctx->reply(r, ss); | |
759 | return true; | |
760 | } else if (r == 0 || dry_run) { | |
761 | ss << "no change"; | |
762 | cmdctx->reply(r, ss); | |
763 | return true; | |
764 | } else { | |
765 | json_spirit::Object json_object; | |
766 | for (const auto& osd_weight : new_weights) { | |
767 | json_spirit::Config::add(json_object, | |
768 | std::to_string(osd_weight.first), | |
769 | std::to_string(osd_weight.second)); | |
770 | } | |
771 | string s = json_spirit::write(json_object); | |
772 | std::replace(begin(s), end(s), '\"', '\''); | |
773 | const string cmd = | |
774 | "{" | |
775 | "\"prefix\": \"osd reweightn\", " | |
776 | "\"weights\": \"" + s + "\"" | |
777 | "}"; | |
778 | auto on_finish = new ReplyOnFinish(cmdctx); | |
779 | monc->start_mon_command({cmd}, {}, | |
780 | &on_finish->from_mon, &on_finish->outs, on_finish); | |
781 | return true; | |
782 | } | |
31f18b77 FG |
783 | } else if (prefix == "osd df") { |
784 | string method; | |
785 | cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method); | |
786 | r = cluster_state.with_pgservice([&](const PGMapStatService& pgservice) { | |
787 | return cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
788 | print_osd_utilization(osdmap, &pgservice, ss, | |
789 | f.get(), method == "tree"); | |
790 | ||
791 | cmdctx->odata.append(ss); | |
792 | return 0; | |
793 | }); | |
794 | }); | |
795 | cmdctx->reply(r, ""); | |
796 | return true; | |
7c673cae FG |
797 | } else { |
798 | r = cluster_state.with_pgmap([&](const PGMap& pg_map) { | |
799 | return cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
800 | return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap, | |
801 | f.get(), &ss, &cmdctx->odata); | |
802 | }); | |
803 | }); | |
804 | ||
805 | if (r != -EOPNOTSUPP) { | |
806 | cmdctx->reply(r, ss); | |
807 | return true; | |
808 | } | |
809 | } | |
810 | ||
811 | // None of the special native commands, | |
812 | MgrPyModule *handler = nullptr; | |
813 | auto py_commands = py_modules.get_commands(); | |
814 | for (const auto &pyc : py_commands) { | |
815 | auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring); | |
816 | dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl; | |
817 | if (pyc_prefix == prefix) { | |
818 | handler = pyc.handler; | |
819 | break; | |
820 | } | |
821 | } | |
822 | ||
823 | if (handler == nullptr) { | |
824 | ss << "No handler found for '" << prefix << "'"; | |
825 | dout(4) << "No handler found for '" << prefix << "'" << dendl; | |
826 | cmdctx->reply(-EINVAL, ss); | |
827 | return true; | |
828 | } else { | |
829 | // Okay, now we have a handler to call, but we must not call it | |
830 | // in this thread, because the python handlers can do anything, | |
831 | // including blocking, and including calling back into mgr. | |
832 | dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl; | |
833 | finisher.queue(new FunctionContext([cmdctx, handler](int r_) { | |
834 | std::stringstream ds; | |
835 | std::stringstream ss; | |
836 | int r = handler->handle_command(cmdctx->cmdmap, &ds, &ss); | |
837 | cmdctx->odata.append(ds); | |
838 | cmdctx->reply(r, ss); | |
839 | })); | |
840 | return true; | |
841 | } | |
842 | } | |
31f18b77 FG |
843 | |
844 | void DaemonServer::send_report() | |
845 | { | |
846 | auto m = new MMonMgrReport(); | |
847 | cluster_state.with_pgmap([&](const PGMap& pg_map) { | |
848 | cluster_state.update_delta_stats(); | |
849 | ||
850 | // FIXME: reporting health detail here might be a bad idea? | |
851 | cluster_state.with_osdmap([&](const OSDMap& osdmap) { | |
852 | // FIXME: no easy way to get mon features here. this will do for | |
853 | // now, though, as long as we don't make a backward-incompat change. | |
854 | pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL); | |
855 | dout(10) << pg_map << dendl; | |
856 | pg_map.get_health(g_ceph_context, osdmap, | |
857 | m->health_summary, | |
858 | &m->health_detail); | |
859 | }); | |
860 | }); | |
861 | // TODO? We currently do not notify the PyModules | |
862 | // TODO: respect needs_send, so we send the report only if we are asked to do | |
863 | // so, or the state is updated. | |
864 | monc->send_mon_message(m); | |
865 | } |