]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/mon/MonClient.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / mon / MonClient.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
11fdf7f2
TL
4#include "MonClient.h"
5
6#include <random>
7
8#include <seastar/core/future-util.hh>
9#include <seastar/core/lowres_clock.hh>
9f95a23c 10#include <seastar/core/shared_future.hh>
11fdf7f2
TL
11#include <seastar/util/log.hh>
12
13#include "auth/AuthClientHandler.h"
11fdf7f2
TL
14#include "auth/RotatingKeyRing.h"
15
16#include "common/hostname.h"
17
18#include "crimson/auth/KeyRing.h"
19#include "crimson/common/config_proxy.h"
20#include "crimson/common/log.h"
21#include "crimson/net/Connection.h"
22#include "crimson/net/Errors.h"
23#include "crimson/net/Messenger.h"
24
25#include "messages/MAuth.h"
26#include "messages/MAuthReply.h"
27#include "messages/MConfig.h"
28#include "messages/MLogAck.h"
29#include "messages/MMonCommand.h"
30#include "messages/MMonCommandAck.h"
9f95a23c 31#include "messages/MMonGetMap.h"
11fdf7f2
TL
32#include "messages/MMonGetVersion.h"
33#include "messages/MMonGetVersionReply.h"
34#include "messages/MMonMap.h"
35#include "messages/MMonSubscribe.h"
36#include "messages/MMonSubscribeAck.h"
37
38namespace {
39 seastar::logger& logger()
40 {
9f95a23c 41 return crimson::get_logger(ceph_subsys_monc);
11fdf7f2
TL
42 }
43}
44
9f95a23c 45namespace crimson::mon {
11fdf7f2 46
9f95a23c 47using crimson::common::local_conf;
11fdf7f2
TL
48
49class Connection {
50public:
9f95a23c
TL
51 Connection(const AuthRegistry& auth_registry,
52 crimson::net::ConnectionRef conn,
53 KeyRing* keyring);
f67539c2 54 enum class auth_result_t {
9f95a23c
TL
55 success = 0,
56 failure,
57 canceled
58 };
11fdf7f2 59 seastar::future<> handle_auth_reply(Ref<MAuthReply> m);
9f95a23c 60 // v1
f67539c2 61 seastar::future<auth_result_t> authenticate_v1(
9f95a23c
TL
62 epoch_t epoch,
63 const EntityName& name,
64 uint32_t want_keys);
65 // v2
f67539c2 66 seastar::future<auth_result_t> authenticate_v2();
9f95a23c
TL
67 auth::AuthClient::auth_request_t
68 get_auth_request(const EntityName& name,
69 uint32_t want_keys);
70 using secret_t = string;
71 tuple<CryptoKey, secret_t, bufferlist>
72 handle_auth_reply_more(const ceph::buffer::list& bl);
9f95a23c
TL
73 int handle_auth_bad_method(uint32_t old_auth_method,
74 int result,
75 const std::vector<uint32_t>& allowed_methods,
76 const std::vector<uint32_t>& allowed_modes);
77
78 // v1 and v2
f67539c2
TL
79 tuple<CryptoKey, secret_t, int>
80 handle_auth_done(uint64_t new_global_id,
81 const ceph::buffer::list& bl);
82 void close();
11fdf7f2 83 bool is_my_peer(const entity_addr_t& addr) const;
9f95a23c
TL
84 AuthAuthorizer* get_authorizer(entity_type_t peer) const;
85 KeyStore& get_keys();
11fdf7f2 86 seastar::future<> renew_tickets();
9f95a23c
TL
87 seastar::future<> renew_rotating_keyring();
88
89 crimson::net::ConnectionRef get_conn();
11fdf7f2
TL
90
91private:
92 seastar::future<> setup_session(epoch_t epoch,
9f95a23c
TL
93 const EntityName& name);
94 std::unique_ptr<AuthClientHandler> create_auth(crimson::auth::method_t,
95 uint64_t global_id,
96 const EntityName& name,
97 uint32_t want_keys);
98 enum class request_t {
99 rotating,
100 general,
101 };
f67539c2
TL
102 seastar::future<std::optional<auth_result_t>> do_auth_single(request_t);
103 seastar::future<auth_result_t> do_auth(request_t);
11fdf7f2
TL
104
105private:
106 bool closed = false;
9f95a23c
TL
107 // v1
108 seastar::shared_promise<Ref<MAuthReply>> reply;
109 // v2
110 using clock_t = seastar::lowres_system_clock;
111 clock_t::time_point auth_start;
112 crimson::auth::method_t auth_method = 0;
f67539c2 113 std::optional<seastar::promise<auth_result_t>> auth_done;
9f95a23c
TL
114 // v1 and v2
115 const AuthRegistry& auth_registry;
116 crimson::net::ConnectionRef conn;
11fdf7f2 117 std::unique_ptr<AuthClientHandler> auth;
9f95a23c
TL
118 std::unique_ptr<RotatingKeyRing> rotating_keyring;
119 uint64_t global_id = 0;
120 clock_t::time_point last_rotating_renew_sent;
11fdf7f2
TL
121};
122
9f95a23c
TL
123Connection::Connection(const AuthRegistry& auth_registry,
124 crimson::net::ConnectionRef conn,
11fdf7f2 125 KeyRing* keyring)
9f95a23c
TL
126 : auth_registry{auth_registry},
127 conn{conn},
128 rotating_keyring{
129 std::make_unique<RotatingKeyRing>(nullptr,
130 CEPH_ENTITY_TYPE_OSD,
131 keyring)}
11fdf7f2
TL
132{}
133
134seastar::future<> Connection::handle_auth_reply(Ref<MAuthReply> m)
135{
136 reply.set_value(m);
9f95a23c 137 reply = {};
11fdf7f2
TL
138 return seastar::now();
139}
140
141seastar::future<> Connection::renew_tickets()
142{
143 if (auth->need_tickets()) {
f67539c2
TL
144 return do_auth(request_t::general).then([](auth_result_t r) {
145 if (r != auth_result_t::success) {
9f95a23c
TL
146 throw std::system_error(
147 make_error_code(
148 crimson::net::error::negotiation_failure));
11fdf7f2
TL
149 }
150 });
151 }
152 return seastar::now();
153}
154
9f95a23c
TL
155seastar::future<> Connection::renew_rotating_keyring()
156{
157 auto now = clock_t::now();
158 auto ttl = std::chrono::seconds{
159 static_cast<long>(crimson::common::local_conf()->auth_service_ticket_ttl)};
160 auto cutoff = now - ttl / 4;
161 if (!rotating_keyring->need_new_secrets(utime_t(cutoff))) {
162 return seastar::now();
163 }
164 if (now - last_rotating_renew_sent < std::chrono::seconds{1}) {
165 logger().info("renew_rotating_keyring called too often");
166 return seastar::now();
167 }
168 last_rotating_renew_sent = now;
f67539c2
TL
169 return do_auth(request_t::rotating).then([](auth_result_t r) {
170 if (r != auth_result_t::success) {
9f95a23c
TL
171 throw std::system_error(make_error_code(
172 crimson::net::error::negotiation_failure));
173 }
174 });
175}
176
177AuthAuthorizer* Connection::get_authorizer(entity_type_t peer) const
178{
179 if (auth) {
180 return auth->build_authorizer(peer);
181 } else {
182 return nullptr;
183 }
184}
185
186KeyStore& Connection::get_keys() {
187 return *rotating_keyring;
188}
189
11fdf7f2 190std::unique_ptr<AuthClientHandler>
9f95a23c
TL
191Connection::create_auth(crimson::auth::method_t protocol,
192 uint64_t global_id,
11fdf7f2
TL
193 const EntityName& name,
194 uint32_t want_keys)
195{
9f95a23c 196 static crimson::common::CephContext cct;
11fdf7f2
TL
197 std::unique_ptr<AuthClientHandler> auth;
198 auth.reset(AuthClientHandler::create(&cct,
9f95a23c
TL
199 protocol,
200 rotating_keyring.get()));
11fdf7f2 201 if (!auth) {
9f95a23c 202 logger().error("no handler for protocol {}", protocol);
11fdf7f2 203 throw std::system_error(make_error_code(
9f95a23c 204 crimson::net::error::negotiation_failure));
11fdf7f2
TL
205 }
206 auth->init(name);
207 auth->set_want_keys(want_keys);
208 auth->set_global_id(global_id);
11fdf7f2
TL
209 return auth;
210}
211
212seastar::future<>
213Connection::setup_session(epoch_t epoch,
11fdf7f2
TL
214 const EntityName& name)
215{
9f95a23c
TL
216 auto m = ceph::make_message<MAuth>();
217 m->protocol = CEPH_AUTH_UNKNOWN;
11fdf7f2
TL
218 m->monmap_epoch = epoch;
219 __u8 struct_v = 1;
220 encode(struct_v, m->auth_payload);
9f95a23c
TL
221 std::vector<crimson::auth::method_t> auth_methods;
222 auth_registry.get_supported_methods(conn->get_peer_type(), &auth_methods);
223 encode(auth_methods, m->auth_payload);
11fdf7f2
TL
224 encode(name, m->auth_payload);
225 encode(global_id, m->auth_payload);
226 return conn->send(m);
227}
228
f67539c2 229seastar::future<std::optional<Connection::auth_result_t>>
9f95a23c 230Connection::do_auth_single(Connection::request_t what)
11fdf7f2
TL
231{
232 auto m = make_message<MAuth>();
233 m->protocol = auth->get_protocol();
234 auth->prepare_build_request();
9f95a23c
TL
235 switch (what) {
236 case request_t::rotating:
237 auth->build_rotating_request(m->auth_payload);
238 break;
239 case request_t::general:
240 if (int ret = auth->build_request(m->auth_payload); ret) {
241 logger().error("missing/bad key for '{}'", local_conf()->name);
242 throw std::system_error(make_error_code(
243 crimson::net::error::negotiation_failure));
244 }
245 break;
246 default:
247 assert(0);
11fdf7f2
TL
248 }
249 logger().info("sending {}", *m);
250 return conn->send(m).then([this] {
251 logger().info("waiting");
9f95a23c 252 return reply.get_shared_future();
11fdf7f2 253 }).then([this] (Ref<MAuthReply> m) {
9f95a23c
TL
254 if (!m) {
255 ceph_assert(closed);
256 logger().info("do_auth: connection closed");
f67539c2
TL
257 return seastar::make_ready_future<std::optional<Connection::auth_result_t>>(
258 std::make_optional(auth_result_t::canceled));
9f95a23c
TL
259 }
260 logger().info(
261 "do_auth: mon {} => {} returns {}: {}",
262 conn->get_messenger()->get_myaddr(),
263 conn->get_peer_addr(), *m, m->result);
11fdf7f2
TL
264 auto p = m->result_bl.cbegin();
265 auto ret = auth->handle_response(m->result, p,
11fdf7f2
TL
266 nullptr, nullptr);
267 if (ret != 0 && ret != -EAGAIN) {
9f95a23c
TL
268 logger().error(
269 "do_auth: got error {} on mon {}",
270 ret,
271 conn->get_peer_addr());
11fdf7f2 272 }
f67539c2 273 return seastar::make_ready_future<std::optional<Connection::auth_result_t>>(
9f95a23c
TL
274 ret == -EAGAIN
275 ? std::nullopt
276 : std::make_optional(ret == 0
f67539c2
TL
277 ? auth_result_t::success
278 : auth_result_t::failure
9f95a23c 279 ));
11fdf7f2
TL
280 });
281}
282
f67539c2 283seastar::future<Connection::auth_result_t>
9f95a23c
TL
284Connection::do_auth(Connection::request_t what) {
285 return seastar::repeat_until_value([this, what]() {
286 return do_auth_single(what);
287 });
288}
289
f67539c2 290seastar::future<Connection::auth_result_t>
9f95a23c
TL
291Connection::authenticate_v1(epoch_t epoch,
292 const EntityName& name,
293 uint32_t want_keys)
11fdf7f2 294{
9f95a23c
TL
295 return conn->keepalive().then([epoch, name, this] {
296 return setup_session(epoch, name);
11fdf7f2 297 }).then([this] {
9f95a23c 298 return reply.get_shared_future();
11fdf7f2 299 }).then([name, want_keys, this](Ref<MAuthReply> m) {
9f95a23c
TL
300 if (!m) {
301 logger().error("authenticate_v1 canceled on {}", name);
f67539c2 302 return seastar::make_ready_future<auth_result_t>(auth_result_t::canceled);
9f95a23c 303 }
11fdf7f2 304 global_id = m->global_id;
9f95a23c 305 auth = create_auth(m->protocol, m->global_id, name, want_keys);
11fdf7f2
TL
306 switch (auto p = m->result_bl.cbegin();
307 auth->handle_response(m->result, p,
11fdf7f2
TL
308 nullptr, nullptr)) {
309 case 0:
310 // none
f67539c2 311 return seastar::make_ready_future<auth_result_t>(auth_result_t::success);
11fdf7f2
TL
312 case -EAGAIN:
313 // cephx
9f95a23c 314 return do_auth(request_t::general);
11fdf7f2
TL
315 default:
316 ceph_assert_always(0);
317 }
9f95a23c
TL
318 }).handle_exception([](auto ep) {
319 logger().error("authenticate_v1 failed with {}", ep);
f67539c2 320 return seastar::make_ready_future<auth_result_t>(auth_result_t::canceled);
9f95a23c
TL
321 });
322}
323
f67539c2 324seastar::future<Connection::auth_result_t> Connection::authenticate_v2()
9f95a23c
TL
325{
326 auth_start = seastar::lowres_system_clock::now();
327 return conn->send(make_message<MMonGetMap>()).then([this] {
f67539c2
TL
328 auth_done.emplace();
329 return auth_done->get_future();
11fdf7f2
TL
330 });
331}
332
9f95a23c
TL
333auth::AuthClient::auth_request_t
334Connection::get_auth_request(const EntityName& entity_name,
335 uint32_t want_keys)
336{
337 // choose method
338 auth_method = [&] {
339 std::vector<crimson::auth::method_t> methods;
340 auth_registry.get_supported_methods(conn->get_peer_type(), &methods);
341 if (methods.empty()) {
342 logger().info("get_auth_request no methods is supported");
343 throw crimson::auth::error("no methods is supported");
344 }
345 return methods.front();
346 }();
347
348 std::vector<uint32_t> modes;
349 auth_registry.get_supported_modes(conn->get_peer_type(), auth_method,
350 &modes);
351 logger().info("method {} preferred_modes {}", auth_method, modes);
352 if (modes.empty()) {
353 throw crimson::auth::error("no modes is supported");
354 }
355 auth = create_auth(auth_method, global_id, entity_name, want_keys);
356
357 using ceph::encode;
358 bufferlist bl;
359 // initial request includes some boilerplate...
360 encode((char)AUTH_MODE_MON, bl);
361 encode(entity_name, bl);
362 encode(global_id, bl);
363 // and (maybe) some method-specific initial payload
364 auth->build_initial_request(&bl);
365 return {auth_method, modes, bl};
366}
367
368tuple<CryptoKey, Connection::secret_t, bufferlist>
369Connection::handle_auth_reply_more(const ceph::buffer::list& payload)
370{
371 CryptoKey session_key;
372 secret_t connection_secret;
373 bufferlist reply;
374 auto p = payload.cbegin();
375 int r = auth->handle_response(0, p, &session_key, &connection_secret);
376 if (r == -EAGAIN) {
377 auth->prepare_build_request();
378 auth->build_request(reply);
379 logger().info(" responding with {} bytes", reply.length());
380 return {session_key, connection_secret, reply};
381 } else if (r < 0) {
382 logger().error(" handle_response returned {}", r);
383 throw crimson::auth::error("unable to build auth");
384 } else {
385 logger().info("authenticated!");
386 std::terminate();
387 }
388}
389
390tuple<CryptoKey, Connection::secret_t, int>
391Connection::handle_auth_done(uint64_t new_global_id,
392 const ceph::buffer::list& payload)
393{
394 global_id = new_global_id;
395 auth->set_global_id(global_id);
396 auto p = payload.begin();
397 CryptoKey session_key;
398 secret_t connection_secret;
399 int r = auth->handle_response(0, p, &session_key, &connection_secret);
400 conn->set_last_keepalive_ack(auth_start);
f67539c2
TL
401 if (auth_done) {
402 auth_done->set_value(auth_result_t::success);
403 auth_done.reset();
404 }
9f95a23c
TL
405 return {session_key, connection_secret, r};
406}
407
408int Connection::handle_auth_bad_method(uint32_t old_auth_method,
409 int result,
410 const std::vector<uint32_t>& allowed_methods,
411 const std::vector<uint32_t>& allowed_modes)
412{
413 logger().info("old_auth_method {} result {} allowed_methods {}",
414 old_auth_method, cpp_strerror(result), allowed_methods);
415 std::vector<uint32_t> auth_supported;
416 auth_registry.get_supported_methods(conn->get_peer_type(), &auth_supported);
417 auto p = std::find(auth_supported.begin(), auth_supported.end(),
418 old_auth_method);
419 assert(p != auth_supported.end());
420 p = std::find_first_of(std::next(p), auth_supported.end(),
421 allowed_methods.begin(), allowed_methods.end());
422 if (p == auth_supported.end()) {
423 logger().error("server allowed_methods {} but i only support {}",
424 allowed_methods, auth_supported);
f67539c2
TL
425 assert(auth_done);
426 auth_done->set_exception(std::system_error(make_error_code(
9f95a23c
TL
427 crimson::net::error::negotiation_failure)));
428 return -EACCES;
429 }
430 auth_method = *p;
431 logger().info("will try {} next", auth_method);
432 return 0;
433}
434
f67539c2 435void Connection::close()
11fdf7f2 436{
9f95a23c
TL
437 reply.set_value(Ref<MAuthReply>(nullptr));
438 reply = {};
f67539c2
TL
439 if (auth_done) {
440 auth_done->set_value(auth_result_t::canceled);
441 auth_done.reset();
442 }
11fdf7f2 443 if (conn && !std::exchange(closed, true)) {
f67539c2 444 conn->mark_down();
11fdf7f2
TL
445 }
446}
447
448bool Connection::is_my_peer(const entity_addr_t& addr) const
449{
9f95a23c 450 ceph_assert(conn);
11fdf7f2
TL
451 return conn->get_peer_addr() == addr;
452}
453
9f95a23c 454crimson::net::ConnectionRef Connection::get_conn() {
11fdf7f2
TL
455 return conn;
456}
457
9f95a23c
TL
458Client::Client(crimson::net::Messenger& messenger,
459 crimson::common::AuthHandler& auth_handler)
11fdf7f2
TL
460 // currently, crimson is OSD-only
461 : want_keys{CEPH_ENTITY_TYPE_MON |
462 CEPH_ENTITY_TYPE_OSD |
463 CEPH_ENTITY_TYPE_MGR},
464 timer{[this] { tick(); }},
9f95a23c
TL
465 msgr{messenger},
466 auth_registry{&cct},
467 auth_handler{auth_handler}
11fdf7f2
TL
468{}
469
470Client::Client(Client&&) = default;
471Client::~Client() = default;
472
473seastar::future<> Client::start() {
9f95a23c
TL
474 entity_name = crimson::common::local_conf()->name;
475 auth_registry.refresh_config();
11fdf7f2 476 return load_keyring().then([this] {
9f95a23c 477 return monmap.build_initial(crimson::common::local_conf(), false);
11fdf7f2
TL
478 }).then([this] {
479 return authenticate();
9f95a23c
TL
480 }).then([this] {
481 auto interval =
482 std::chrono::duration_cast<seastar::lowres_clock::duration>(
483 std::chrono::duration<double>(
484 local_conf().get_val<double>("mon_client_ping_interval")));
485 timer.arm_periodic(interval);
11fdf7f2
TL
486 });
487}
488
489seastar::future<> Client::load_keyring()
490{
9f95a23c 491 if (!auth_registry.is_supported_method(msgr.get_mytype(), CEPH_AUTH_CEPHX)) {
11fdf7f2
TL
492 return seastar::now();
493 } else {
9f95a23c
TL
494 return crimson::auth::load_from_keyring(&keyring).then([](KeyRing* keyring) {
495 return crimson::auth::load_from_keyfile(keyring);
11fdf7f2 496 }).then([](KeyRing* keyring) {
9f95a23c 497 return crimson::auth::load_from_key(keyring);
11fdf7f2
TL
498 }).then([](KeyRing*) {
499 return seastar::now();
500 });
501 }
502}
503
504void Client::tick()
505{
f67539c2 506 gate.dispatch_in_background(__func__, *this, [this] {
9f95a23c
TL
507 if (active_con) {
508 return seastar::when_all_succeed(active_con->get_conn()->keepalive(),
509 active_con->renew_tickets(),
f67539c2 510 active_con->renew_rotating_keyring()).then_unpack([] {});
9f95a23c
TL
511 } else {
512 return seastar::now();
513 }
11fdf7f2
TL
514 });
515}
516
517bool Client::is_hunting() const {
518 return !active_con;
519}
520
f67539c2
TL
521std::optional<seastar::future<>>
522Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
11fdf7f2 523{
f67539c2
TL
524 bool dispatched = true;
525 gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
526 // we only care about these message types
527 switch (m->get_type()) {
528 case CEPH_MSG_MON_MAP:
529 return handle_monmap(conn, boost::static_pointer_cast<MMonMap>(m));
530 case CEPH_MSG_AUTH_REPLY:
531 return handle_auth_reply(
532 conn, boost::static_pointer_cast<MAuthReply>(m));
533 case CEPH_MSG_MON_SUBSCRIBE_ACK:
534 return handle_subscribe_ack(
535 boost::static_pointer_cast<MMonSubscribeAck>(m));
536 case CEPH_MSG_MON_GET_VERSION_REPLY:
537 return handle_get_version_reply(
538 boost::static_pointer_cast<MMonGetVersionReply>(m));
539 case MSG_MON_COMMAND_ACK:
540 return handle_mon_command_ack(
541 boost::static_pointer_cast<MMonCommandAck>(m));
542 case MSG_LOGACK:
543 return handle_log_ack(
544 boost::static_pointer_cast<MLogAck>(m));
545 case MSG_CONFIG:
546 return handle_config(
547 boost::static_pointer_cast<MConfig>(m));
548 default:
549 dispatched = false;
550 return seastar::now();
551 }
552 });
553 return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
11fdf7f2
TL
554}
555
f67539c2 556void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool /* is_replace */)
11fdf7f2 557{
f67539c2
TL
558 gate.dispatch_in_background(__func__, *this, [this, conn] {
559 auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
560 [peer_addr = conn->get_peer_addr()](auto& mc) {
561 return mc->is_my_peer(peer_addr);
562 });
563 if (found != pending_conns.end()) {
564 logger().warn("pending conn reset by {}", conn->get_peer_addr());
565 (*found)->close();
566 return seastar::now();
567 } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) {
568 logger().warn("active conn reset {}", conn->get_peer_addr());
569 active_con.reset();
570 return reopen_session(-1).then([this] {
571 send_pendings();
572 return seastar::now();
573 });
574 } else {
575 return seastar::now();
576 }
577 });
11fdf7f2
TL
578}
579
9f95a23c
TL
580std::pair<std::vector<uint32_t>, std::vector<uint32_t>>
581Client::get_supported_auth_methods(int peer_type)
582{
583 std::vector<uint32_t> methods;
584 std::vector<uint32_t> modes;
585 auth_registry.get_supported_methods(peer_type, &methods, &modes);
586 return {methods, modes};
587}
588
589uint32_t Client::pick_con_mode(int peer_type,
590 uint32_t auth_method,
591 const std::vector<uint32_t>& preferred_modes)
592{
593 return auth_registry.pick_mode(peer_type, auth_method, preferred_modes);
594}
595
596AuthAuthorizeHandler* Client::get_auth_authorize_handler(int peer_type,
597 int auth_method)
598{
599 return auth_registry.get_handler(peer_type, auth_method);
600}
601
602
603int Client::handle_auth_request(crimson::net::ConnectionRef con,
604 AuthConnectionMetaRef auth_meta,
605 bool more,
606 uint32_t auth_method,
607 const ceph::bufferlist& payload,
608 ceph::bufferlist *reply)
609{
610 // for some channels prior to nautilus (osd heartbeat), we tolerate the lack of
611 // an authorizer.
612 if (payload.length() == 0) {
613 if (con->get_messenger()->get_require_authorizer()) {
614 return -EACCES;
615 } else {
616 auth_handler.handle_authentication({}, {});
617 return 1;
618 }
619 }
620 auth_meta->auth_mode = payload[0];
621 if (auth_meta->auth_mode < AUTH_MODE_AUTHORIZER ||
622 auth_meta->auth_mode > AUTH_MODE_AUTHORIZER_MAX) {
623 return -EACCES;
624 }
625 AuthAuthorizeHandler* ah = get_auth_authorize_handler(con->get_peer_type(),
626 auth_method);
627 if (!ah) {
628 logger().error("no AuthAuthorizeHandler found for auth method: {}",
629 auth_method);
630 return -EOPNOTSUPP;
631 }
632 auto authorizer_challenge = &auth_meta->authorizer_challenge;
f67539c2
TL
633 if (auth_meta->skip_authorizer_challenge) {
634 logger().info("skipping challenge on {}", con);
9f95a23c
TL
635 authorizer_challenge = nullptr;
636 }
637 bool was_challenge = (bool)auth_meta->authorizer_challenge;
638 EntityName name;
639 AuthCapsInfo caps_info;
640 bool is_valid = ah->verify_authorizer(
641 &cct,
642 active_con->get_keys(),
643 payload,
644 auth_meta->get_connection_secret_length(),
645 reply,
646 &name,
647 &active_con->get_conn()->peer_global_id,
648 &caps_info,
649 &auth_meta->session_key,
650 &auth_meta->connection_secret,
651 authorizer_challenge);
652 if (is_valid) {
653 auth_handler.handle_authentication(name, caps_info);
654 return 1;
655 }
656 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
657 logger().info("added challenge on {}", con);
658 return 0;
659 } else {
660 logger().info("bad authorizer on {}", con);
661 return -EACCES;
662 }
663}
664
665auth::AuthClient::auth_request_t
666Client::get_auth_request(crimson::net::ConnectionRef con,
667 AuthConnectionMetaRef auth_meta)
668{
669 logger().info("get_auth_request(con={}, auth_method={})",
670 con, auth_meta->auth_method);
671 // connection to mon?
672 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
673 auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
674 [peer_addr = con->get_peer_addr()](auto& mc) {
675 return mc->is_my_peer(peer_addr);
676 });
677 if (found == pending_conns.end()) {
678 throw crimson::auth::error{"unknown connection"};
679 }
680 return (*found)->get_auth_request(entity_name, want_keys);
681 } else {
682 // generate authorizer
683 if (!active_con) {
684 logger().error(" but no auth handler is set up");
685 throw crimson::auth::error("no auth available");
686 }
687 auto authorizer = active_con->get_authorizer(con->get_peer_type());
688 if (!authorizer) {
689 logger().error("failed to build_authorizer for type {}",
690 ceph_entity_type_name(con->get_peer_type()));
691 throw crimson::auth::error("unable to build auth");
692 }
693 auth_meta->authorizer.reset(authorizer);
694 auth_meta->auth_method = authorizer->protocol;
695 vector<uint32_t> modes;
696 auth_registry.get_supported_modes(con->get_peer_type(),
697 auth_meta->auth_method,
698 &modes);
699 return {authorizer->protocol, modes, authorizer->bl};
700 }
701}
702
703ceph::bufferlist Client::handle_auth_reply_more(crimson::net::ConnectionRef conn,
704 AuthConnectionMetaRef auth_meta,
705 const bufferlist& bl)
706{
707 if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
708 auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
709 [peer_addr = conn->get_peer_addr()](auto& mc) {
710 return mc->is_my_peer(peer_addr);
711 });
712 if (found == pending_conns.end()) {
713 throw crimson::auth::error{"unknown connection"};
714 }
715 bufferlist reply;
716 tie(auth_meta->session_key, auth_meta->connection_secret, reply) =
717 (*found)->handle_auth_reply_more(bl);
718 return reply;
719 } else {
720 // authorizer challenges
721 if (!active_con || !auth_meta->authorizer) {
722 logger().error("no authorizer?");
723 throw crimson::auth::error("no auth available");
724 }
725 auth_meta->authorizer->add_challenge(&cct, bl);
726 return auth_meta->authorizer->bl;
727 }
728}
729
730int Client::handle_auth_done(crimson::net::ConnectionRef conn,
731 AuthConnectionMetaRef auth_meta,
732 uint64_t global_id,
733 uint32_t con_mode,
734 const bufferlist& bl)
735{
736 if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
737 auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
738 [peer_addr = conn->get_peer_addr()](auto& mc) {
739 return mc->is_my_peer(peer_addr);
740 });
741 if (found == pending_conns.end()) {
742 return -ENOENT;
743 }
744 int r = 0;
745 tie(auth_meta->session_key, auth_meta->connection_secret, r) =
746 (*found)->handle_auth_done(global_id, bl);
747 return r;
748 } else {
749 // verify authorizer reply
750 auto p = bl.begin();
751 if (!auth_meta->authorizer->verify_reply(p, &auth_meta->connection_secret)) {
752 logger().error("failed verifying authorizer reply");
753 return -EACCES;
754 }
755 auth_meta->session_key = auth_meta->authorizer->session_key;
756 return 0;
757 }
758}
759
760 // Handle server's indication that the previous auth attempt failed
761int Client::handle_auth_bad_method(crimson::net::ConnectionRef conn,
762 AuthConnectionMetaRef auth_meta,
763 uint32_t old_auth_method,
764 int result,
765 const std::vector<uint32_t>& allowed_methods,
766 const std::vector<uint32_t>& allowed_modes)
767{
768 if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
769 auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
770 [peer_addr = conn->get_peer_addr()](auto& mc) {
771 return mc->is_my_peer(peer_addr);
772 });
773 if (found != pending_conns.end()) {
774 return (*found)->handle_auth_bad_method(
775 old_auth_method, result,
776 allowed_methods, allowed_modes);
777 } else {
778 return -ENOENT;
779 }
780 } else {
781 // huh...
782 logger().info("hmm, they didn't like {} result {}",
783 old_auth_method, cpp_strerror(result));
784 return -EACCES;
785 }
786}
787
f67539c2 788seastar::future<> Client::handle_monmap(crimson::net::ConnectionRef conn,
11fdf7f2
TL
789 Ref<MMonMap> m)
790{
791 monmap.decode(m->monmapbl);
792 const auto peer_addr = conn->get_peer_addr();
793 auto cur_mon = monmap.get_name(peer_addr);
794 logger().info("got monmap {}, mon.{}, is now rank {}",
795 monmap.epoch, cur_mon, monmap.get_rank(cur_mon));
796 sub.got("monmap", monmap.get_epoch());
797
798 if (monmap.get_addr_name(peer_addr, cur_mon)) {
9f95a23c
TL
799 if (active_con) {
800 logger().info("handle_monmap: renewing tickets");
801 return seastar::when_all_succeed(
802 active_con->renew_tickets(),
f67539c2 803 active_con->renew_rotating_keyring()).then_unpack([](){
9f95a23c
TL
804 logger().info("handle_mon_map: renewed tickets");
805 });
806 } else {
807 return seastar::now();
808 }
11fdf7f2
TL
809 } else {
810 logger().warn("mon.{} went away", cur_mon);
f67539c2
TL
811 return reopen_session(-1).then([this] {
812 send_pendings();
813 return seastar::now();
814 });
11fdf7f2
TL
815 }
816}
817
f67539c2
TL
818seastar::future<> Client::handle_auth_reply(crimson::net::ConnectionRef conn,
819 Ref<MAuthReply> m)
11fdf7f2 820{
9f95a23c
TL
821 logger().info(
822 "handle_auth_reply mon {} => {} returns {}: {}",
823 conn->get_messenger()->get_myaddr(),
824 conn->get_peer_addr(), *m, m->result);
11fdf7f2
TL
825 auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
826 [peer_addr = conn->get_peer_addr()](auto& mc) {
9f95a23c 827 return mc->is_my_peer(peer_addr);
11fdf7f2
TL
828 });
829 if (found != pending_conns.end()) {
9f95a23c 830 return (*found)->handle_auth_reply(m);
11fdf7f2
TL
831 } else if (active_con) {
832 return active_con->handle_auth_reply(m);
833 } else {
834 logger().error("unknown auth reply from {}", conn->get_peer_addr());
835 return seastar::now();
836 }
837}
838
839seastar::future<> Client::handle_subscribe_ack(Ref<MMonSubscribeAck> m)
840{
841 sub.acked(m->interval);
842 return seastar::now();
843}
844
845Client::get_version_t Client::get_version(const std::string& map)
846{
847 auto m = make_message<MMonGetVersion>();
848 auto tid = ++last_version_req_id;
849 m->handle = tid;
850 m->what = map;
851 auto& req = version_reqs[tid];
f67539c2 852 return send_message(m).then([&req] {
11fdf7f2
TL
853 return req.get_future();
854 });
855}
856
857seastar::future<>
858Client::handle_get_version_reply(Ref<MMonGetVersionReply> m)
859{
860 if (auto found = version_reqs.find(m->handle);
861 found != version_reqs.end()) {
862 auto& result = found->second;
863 logger().trace("{}: {} returns {}",
864 __func__, m->handle, m->version);
f67539c2 865 result.set_value(std::make_tuple(m->version, m->oldest_version));
11fdf7f2
TL
866 version_reqs.erase(found);
867 } else {
868 logger().warn("{}: version request with handle {} not found",
869 __func__, m->handle);
870 }
871 return seastar::now();
872}
873
874seastar::future<> Client::handle_mon_command_ack(Ref<MMonCommandAck> m)
875{
876 const auto tid = m->get_tid();
877 if (auto found = mon_commands.find(tid);
878 found != mon_commands.end()) {
879 auto& result = found->second;
880 logger().trace("{} {}", __func__, tid);
f67539c2 881 result.set_value(std::make_tuple(m->r, m->rs, std::move(m->get_data())));
11fdf7f2
TL
882 mon_commands.erase(found);
883 } else {
884 logger().warn("{} {} not found", __func__, tid);
885 }
886 return seastar::now();
887}
888
889seastar::future<> Client::handle_log_ack(Ref<MLogAck> m)
890{
891 // XXX
892 return seastar::now();
893}
894
895seastar::future<> Client::handle_config(Ref<MConfig> m)
896{
9f95a23c 897 return crimson::common::local_conf().set_mon_vals(m->config);
11fdf7f2
TL
898}
899
900std::vector<unsigned> Client::get_random_mons(unsigned n) const
901{
902 uint16_t min_priority = std::numeric_limits<uint16_t>::max();
903 for (const auto& m : monmap.mon_info) {
904 if (m.second.priority < min_priority) {
905 min_priority = m.second.priority;
906 }
907 }
908 vector<unsigned> ranks;
909 for (auto [name, info] : monmap.mon_info) {
11fdf7f2
TL
910 if (info.priority == min_priority) {
911 ranks.push_back(monmap.get_rank(name));
912 }
913 }
914 std::random_device rd;
915 std::default_random_engine rng{rd()};
916 std::shuffle(ranks.begin(), ranks.end(), rng);
917 if (n == 0 || n > ranks.size()) {
918 return ranks;
919 } else {
920 return {ranks.begin(), ranks.begin() + n};
921 }
922}
923
924seastar::future<> Client::authenticate()
925{
f67539c2
TL
926 return reopen_session(-1).then([this] {
927 send_pendings();
928 return seastar::now();
929 });
11fdf7f2
TL
930}
931
932seastar::future<> Client::stop()
933{
f67539c2
TL
934 logger().info("{}", __func__);
935 auto fut = gate.close();
936 timer.cancel();
937 for (auto& pending_con : pending_conns) {
938 pending_con->close();
939 }
940 if (active_con) {
941 active_con->close();
942 }
943 return fut;
11fdf7f2
TL
944}
945
946seastar::future<> Client::reopen_session(int rank)
947{
9f95a23c 948 logger().info("{} to mon.{}", __func__, rank);
11fdf7f2
TL
949 vector<unsigned> mons;
950 if (rank >= 0) {
951 mons.push_back(rank);
952 } else {
953 const auto parallel =
9f95a23c 954 crimson::common::local_conf().get_val<uint64_t>("mon_client_hunt_parallel");
11fdf7f2
TL
955 mons = get_random_mons(parallel);
956 }
957 pending_conns.reserve(mons.size());
958 return seastar::parallel_for_each(mons, [this](auto rank) {
f67539c2
TL
959 // TODO: connect to multiple addrs
960 auto peer = monmap.get_addrs(rank).pick_addr(msgr.get_myaddr().get_type());
961 if (peer == entity_addr_t{}) {
962 // crimson msgr only uses the first bound addr
963 logger().warn("mon.{} does not have an addr compatible with me", rank);
964 return seastar::now();
965 }
11fdf7f2 966 logger().info("connecting to mon.{}", rank);
f67539c2
TL
967 return seastar::futurize_invoke(
968 [peer, this] () -> seastar::future<Connection::auth_result_t> {
9f95a23c
TL
969 auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON);
970 auto& mc = pending_conns.emplace_back(
971 std::make_unique<Connection>(auth_registry, conn, &keyring));
972 if (conn->get_peer_addr().is_msgr2()) {
973 return mc->authenticate_v2();
974 } else {
975 return mc->authenticate_v1(monmap.get_epoch(), entity_name, want_keys)
976 .handle_exception([conn](auto ep) {
f67539c2
TL
977 conn->mark_down();
978 return seastar::make_exception_future<Connection::auth_result_t>(ep);
9f95a23c
TL
979 });
980 }
981 }).then([peer, this](auto result) {
f67539c2
TL
982 if (result == Connection::auth_result_t::success) {
983 _finish_auth(peer);
9f95a23c 984 }
9f95a23c
TL
985 logger().debug("reopen_session mon connection attempts complete");
986 }).handle_exception([](auto ep) {
987 logger().error("mon connections failed with ep {}", ep);
988 return seastar::make_exception_future(ep);
11fdf7f2
TL
989 });
990 }).then([this] {
f67539c2
TL
991 if (!active_con) {
992 return seastar::make_exception_future(
993 crimson::common::system_shutdown_exception());
994 }
9f95a23c 995 return active_con->renew_rotating_keyring();
11fdf7f2
TL
996 });
997}
998
f67539c2
TL
999void Client::_finish_auth(const entity_addr_t& peer)
1000{
1001 if (!is_hunting()) {
1002 return;
1003 }
1004 logger().info("found mon.{}", monmap.get_name(peer));
1005
1006 auto found = std::find_if(
1007 pending_conns.begin(), pending_conns.end(),
1008 [peer](auto& conn) {
1009 return conn->is_my_peer(peer);
1010 });
1011 if (found == pending_conns.end()) {
1012 // Happens if another connection has won the race
1013 ceph_assert(active_con && pending_conns.empty());
1014 logger().info("no pending connection for mon.{}, peer {}",
1015 monmap.get_name(peer), peer);
1016 return;
1017 }
1018
1019 ceph_assert(!active_con && !pending_conns.empty());
1020 active_con = std::move(*found);
1021 found->reset();
1022 for (auto& conn : pending_conns) {
1023 if (conn) {
1024 conn->close();
1025 }
1026 }
1027 pending_conns.clear();
1028}
1029
11fdf7f2
TL
1030Client::command_result_t
1031Client::run_command(const std::vector<std::string>& cmd,
1032 const bufferlist& bl)
1033{
1034 auto m = make_message<MMonCommand>(monmap.fsid);
1035 auto tid = ++last_mon_command_id;
1036 m->set_tid(tid);
1037 m->cmd = cmd;
1038 m->set_data(bl);
1039 auto& req = mon_commands[tid];
f67539c2 1040 return send_message(m).then([&req] {
11fdf7f2
TL
1041 return req.get_future();
1042 });
1043}
1044
1045seastar::future<> Client::send_message(MessageRef m)
1046{
f67539c2
TL
1047 if (active_con) {
1048 if (!pending_messages.empty()) {
1049 send_pendings();
1050 }
1051 return active_con->get_conn()->send(m);
1052 }
1053 auto& delayed = pending_messages.emplace_back(m);
1054 return delayed.pr.get_future();
1055}
1056
1057void Client::send_pendings()
1058{
1059 if (active_con) {
1060 for (auto& m : pending_messages) {
1061 (void) active_con->get_conn()->send(m.msg);
1062 m.pr.set_value();
1063 }
1064 pending_messages.clear();
1065 }
11fdf7f2
TL
1066}
1067
1068bool Client::sub_want(const std::string& what, version_t start, unsigned flags)
1069{
1070 return sub.want(what, start, flags);
1071}
1072
1073void Client::sub_got(const std::string& what, version_t have)
1074{
1075 sub.got(what, have);
1076}
1077
1078void Client::sub_unwant(const std::string& what)
1079{
1080 sub.unwant(what);
1081}
1082
1083bool Client::sub_want_increment(const std::string& what,
1084 version_t start,
1085 unsigned flags)
1086{
1087 return sub.inc_want(what, start, flags);
1088}
1089
1090seastar::future<> Client::renew_subs()
1091{
1092 if (!sub.have_new()) {
1093 logger().warn("{} - empty", __func__);
1094 return seastar::now();
1095 }
1096 logger().trace("{}", __func__);
1097
1098 auto m = make_message<MMonSubscribe>();
1099 m->what = sub.get_subs();
1100 m->hostname = ceph_get_short_hostname();
f67539c2 1101 return send_message(m).then([this] {
11fdf7f2
TL
1102 sub.renewed();
1103 });
1104}
1105
f67539c2
TL
1106void Client::print(std::ostream& out) const
1107{
1108 out << "mon." << entity_name;
1109}
1110
9f95a23c 1111} // namespace crimson::mon