]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/mon/MonClient.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / mon / MonClient.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
11fdf7f2
TL
4#include "MonClient.h"
5
6#include <random>
1e59de90 7#include <fmt/ranges.h>
11fdf7f2
TL
8#include <seastar/core/future-util.hh>
9#include <seastar/core/lowres_clock.hh>
9f95a23c 10#include <seastar/core/shared_future.hh>
11fdf7f2
TL
11#include <seastar/util/log.hh>
12
13#include "auth/AuthClientHandler.h"
11fdf7f2
TL
14#include "auth/RotatingKeyRing.h"
15
16#include "common/hostname.h"
17
18#include "crimson/auth/KeyRing.h"
19#include "crimson/common/config_proxy.h"
20#include "crimson/common/log.h"
20effc67 21#include "crimson/common/logclient.h"
11fdf7f2
TL
22#include "crimson/net/Connection.h"
23#include "crimson/net/Errors.h"
24#include "crimson/net/Messenger.h"
25
26#include "messages/MAuth.h"
27#include "messages/MAuthReply.h"
28#include "messages/MConfig.h"
29#include "messages/MLogAck.h"
30#include "messages/MMonCommand.h"
31#include "messages/MMonCommandAck.h"
9f95a23c 32#include "messages/MMonGetMap.h"
11fdf7f2
TL
33#include "messages/MMonGetVersion.h"
34#include "messages/MMonGetVersionReply.h"
35#include "messages/MMonMap.h"
36#include "messages/MMonSubscribe.h"
37#include "messages/MMonSubscribeAck.h"
38
20effc67
TL
39using std::string;
40using std::tuple;
41using std::vector;
42
11fdf7f2
TL
43namespace {
44 seastar::logger& logger()
45 {
9f95a23c 46 return crimson::get_logger(ceph_subsys_monc);
11fdf7f2
TL
47 }
48}
49
9f95a23c 50namespace crimson::mon {
11fdf7f2 51
9f95a23c 52using crimson::common::local_conf;
11fdf7f2 53
20effc67 54class Connection : public seastar::enable_shared_from_this<Connection> {
11fdf7f2 55public:
9f95a23c
TL
56 Connection(const AuthRegistry& auth_registry,
57 crimson::net::ConnectionRef conn,
58 KeyRing* keyring);
f67539c2 59 enum class auth_result_t {
9f95a23c
TL
60 success = 0,
61 failure,
62 canceled
63 };
11fdf7f2 64 seastar::future<> handle_auth_reply(Ref<MAuthReply> m);
9f95a23c 65 // v2
f67539c2 66 seastar::future<auth_result_t> authenticate_v2();
9f95a23c
TL
67 auth::AuthClient::auth_request_t
68 get_auth_request(const EntityName& name,
69 uint32_t want_keys);
70 using secret_t = string;
71 tuple<CryptoKey, secret_t, bufferlist>
72 handle_auth_reply_more(const ceph::buffer::list& bl);
9f95a23c
TL
73 int handle_auth_bad_method(uint32_t old_auth_method,
74 int result,
75 const std::vector<uint32_t>& allowed_methods,
76 const std::vector<uint32_t>& allowed_modes);
f67539c2
TL
77 tuple<CryptoKey, secret_t, int>
78 handle_auth_done(uint64_t new_global_id,
79 const ceph::buffer::list& bl);
80 void close();
11fdf7f2 81 bool is_my_peer(const entity_addr_t& addr) const;
9f95a23c
TL
82 AuthAuthorizer* get_authorizer(entity_type_t peer) const;
83 KeyStore& get_keys();
11fdf7f2 84 seastar::future<> renew_tickets();
9f95a23c
TL
85 seastar::future<> renew_rotating_keyring();
86
87 crimson::net::ConnectionRef get_conn();
11fdf7f2
TL
88
89private:
9f95a23c
TL
90 std::unique_ptr<AuthClientHandler> create_auth(crimson::auth::method_t,
91 uint64_t global_id,
92 const EntityName& name,
93 uint32_t want_keys);
94 enum class request_t {
95 rotating,
96 general,
97 };
f67539c2
TL
98 seastar::future<std::optional<auth_result_t>> do_auth_single(request_t);
99 seastar::future<auth_result_t> do_auth(request_t);
11fdf7f2
TL
100
101private:
102 bool closed = false;
20effc67 103 seastar::shared_promise<Ref<MAuthReply>> auth_reply;
9f95a23c
TL
104 // v2
105 using clock_t = seastar::lowres_system_clock;
106 clock_t::time_point auth_start;
107 crimson::auth::method_t auth_method = 0;
f67539c2 108 std::optional<seastar::promise<auth_result_t>> auth_done;
9f95a23c
TL
109 const AuthRegistry& auth_registry;
110 crimson::net::ConnectionRef conn;
11fdf7f2 111 std::unique_ptr<AuthClientHandler> auth;
9f95a23c
TL
112 std::unique_ptr<RotatingKeyRing> rotating_keyring;
113 uint64_t global_id = 0;
114 clock_t::time_point last_rotating_renew_sent;
11fdf7f2
TL
115};
116
9f95a23c
TL
117Connection::Connection(const AuthRegistry& auth_registry,
118 crimson::net::ConnectionRef conn,
11fdf7f2 119 KeyRing* keyring)
9f95a23c
TL
120 : auth_registry{auth_registry},
121 conn{conn},
122 rotating_keyring{
123 std::make_unique<RotatingKeyRing>(nullptr,
124 CEPH_ENTITY_TYPE_OSD,
125 keyring)}
11fdf7f2
TL
126{}
127
128seastar::future<> Connection::handle_auth_reply(Ref<MAuthReply> m)
129{
20effc67
TL
130 logger().info("{}", __func__);
131 ceph_assert(m);
132 auth_reply.set_value(m);
133 auth_reply = {};
11fdf7f2
TL
134 return seastar::now();
135}
136
137seastar::future<> Connection::renew_tickets()
138{
139 if (auth->need_tickets()) {
1e59de90 140 logger().info("{}: retrieving new tickets", __func__);
20effc67
TL
141 return do_auth(request_t::general).then([](const auth_result_t r) {
142 if (r == auth_result_t::failure) {
1e59de90 143 logger().info("renew_tickets: ignoring failed auth reply");
11fdf7f2
TL
144 }
145 });
1e59de90
TL
146 } else {
147 logger().debug("{}: don't need new tickets", __func__);
148 return seastar::now();
11fdf7f2 149 }
11fdf7f2
TL
150}
151
9f95a23c
TL
152seastar::future<> Connection::renew_rotating_keyring()
153{
154 auto now = clock_t::now();
155 auto ttl = std::chrono::seconds{
156 static_cast<long>(crimson::common::local_conf()->auth_service_ticket_ttl)};
1e59de90
TL
157 auto cutoff = utime_t{now - std::min(std::chrono::seconds{30}, ttl / 4)};
158 if (!rotating_keyring->need_new_secrets(cutoff)) {
159 logger().debug("renew_rotating_keyring secrets are up-to-date "
160 "(they expire after {})", cutoff);
9f95a23c 161 return seastar::now();
1e59de90
TL
162 } else {
163 logger().info("renew_rotating_keyring renewing rotating keys "
164 " (they expired before {})", cutoff);
9f95a23c 165 }
1e59de90
TL
166 if ((now > last_rotating_renew_sent) &&
167 (now - last_rotating_renew_sent < std::chrono::seconds{1})) {
168 logger().info("renew_rotating_keyring called too often (last: {})",
169 utime_t{last_rotating_renew_sent});
9f95a23c
TL
170 return seastar::now();
171 }
172 last_rotating_renew_sent = now;
20effc67
TL
173 return do_auth(request_t::rotating).then([](const auth_result_t r) {
174 if (r == auth_result_t::failure) {
1e59de90 175 logger().info("renew_rotating_keyring: ignoring failed auth reply");
9f95a23c
TL
176 }
177 });
178}
179
180AuthAuthorizer* Connection::get_authorizer(entity_type_t peer) const
181{
182 if (auth) {
183 return auth->build_authorizer(peer);
184 } else {
185 return nullptr;
186 }
187}
188
189KeyStore& Connection::get_keys() {
190 return *rotating_keyring;
191}
192
11fdf7f2 193std::unique_ptr<AuthClientHandler>
9f95a23c
TL
194Connection::create_auth(crimson::auth::method_t protocol,
195 uint64_t global_id,
11fdf7f2
TL
196 const EntityName& name,
197 uint32_t want_keys)
198{
9f95a23c 199 static crimson::common::CephContext cct;
11fdf7f2
TL
200 std::unique_ptr<AuthClientHandler> auth;
201 auth.reset(AuthClientHandler::create(&cct,
9f95a23c
TL
202 protocol,
203 rotating_keyring.get()));
11fdf7f2 204 if (!auth) {
9f95a23c 205 logger().error("no handler for protocol {}", protocol);
11fdf7f2 206 throw std::system_error(make_error_code(
9f95a23c 207 crimson::net::error::negotiation_failure));
11fdf7f2
TL
208 }
209 auth->init(name);
210 auth->set_want_keys(want_keys);
211 auth->set_global_id(global_id);
11fdf7f2
TL
212 return auth;
213}
214
f67539c2 215seastar::future<std::optional<Connection::auth_result_t>>
9f95a23c 216Connection::do_auth_single(Connection::request_t what)
11fdf7f2 217{
20effc67 218 auto m = crimson::make_message<MAuth>();
11fdf7f2
TL
219 m->protocol = auth->get_protocol();
220 auth->prepare_build_request();
9f95a23c
TL
221 switch (what) {
222 case request_t::rotating:
223 auth->build_rotating_request(m->auth_payload);
224 break;
225 case request_t::general:
226 if (int ret = auth->build_request(m->auth_payload); ret) {
227 logger().error("missing/bad key for '{}'", local_conf()->name);
228 throw std::system_error(make_error_code(
229 crimson::net::error::negotiation_failure));
230 }
231 break;
232 default:
233 assert(0);
11fdf7f2
TL
234 }
235 logger().info("sending {}", *m);
20effc67 236 return conn->send(std::move(m)).then([this] {
11fdf7f2 237 logger().info("waiting");
20effc67
TL
238 return auth_reply.get_shared_future();
239 }).then([this, life_extender=shared_from_this()] (Ref<MAuthReply> m) {
9f95a23c
TL
240 if (!m) {
241 ceph_assert(closed);
20effc67
TL
242 logger().info("do_auth_single: connection closed");
243 return std::make_optional(auth_result_t::canceled);
9f95a23c 244 }
1e59de90
TL
245 logger().info("do_auth_single: {} returns {}: {}",
246 *conn, *m, m->result);
11fdf7f2
TL
247 auto p = m->result_bl.cbegin();
248 auto ret = auth->handle_response(m->result, p,
11fdf7f2 249 nullptr, nullptr);
20effc67
TL
250 std::optional<Connection::auth_result_t> auth_result;
251 switch (ret) {
252 case -EAGAIN:
253 auth_result = std::nullopt;
254 break;
255 case 0:
256 auth_result = auth_result_t::success;
257 break;
258 default:
259 auth_result = auth_result_t::failure;
9f95a23c 260 logger().error(
20effc67
TL
261 "do_auth_single: got error {} on mon {}",
262 ret, conn->get_peer_addr());
263 break;
11fdf7f2 264 }
20effc67 265 return auth_result;
11fdf7f2
TL
266 });
267}
268
f67539c2 269seastar::future<Connection::auth_result_t>
9f95a23c 270Connection::do_auth(Connection::request_t what) {
20effc67
TL
271 return seastar::repeat_until_value(
272 [this, life_extender=shared_from_this(), what]() {
9f95a23c
TL
273 return do_auth_single(what);
274 });
275}
276
f67539c2 277seastar::future<Connection::auth_result_t> Connection::authenticate_v2()
9f95a23c
TL
278{
279 auth_start = seastar::lowres_system_clock::now();
20effc67 280 return conn->send(crimson::make_message<MMonGetMap>()).then([this] {
f67539c2
TL
281 auth_done.emplace();
282 return auth_done->get_future();
11fdf7f2
TL
283 });
284}
285
9f95a23c
TL
286auth::AuthClient::auth_request_t
287Connection::get_auth_request(const EntityName& entity_name,
288 uint32_t want_keys)
289{
290 // choose method
291 auth_method = [&] {
292 std::vector<crimson::auth::method_t> methods;
293 auth_registry.get_supported_methods(conn->get_peer_type(), &methods);
294 if (methods.empty()) {
295 logger().info("get_auth_request no methods is supported");
296 throw crimson::auth::error("no methods is supported");
297 }
298 return methods.front();
299 }();
300
301 std::vector<uint32_t> modes;
302 auth_registry.get_supported_modes(conn->get_peer_type(), auth_method,
303 &modes);
304 logger().info("method {} preferred_modes {}", auth_method, modes);
305 if (modes.empty()) {
306 throw crimson::auth::error("no modes is supported");
307 }
308 auth = create_auth(auth_method, global_id, entity_name, want_keys);
309
310 using ceph::encode;
311 bufferlist bl;
312 // initial request includes some boilerplate...
313 encode((char)AUTH_MODE_MON, bl);
314 encode(entity_name, bl);
315 encode(global_id, bl);
316 // and (maybe) some method-specific initial payload
317 auth->build_initial_request(&bl);
318 return {auth_method, modes, bl};
319}
320
321tuple<CryptoKey, Connection::secret_t, bufferlist>
322Connection::handle_auth_reply_more(const ceph::buffer::list& payload)
323{
324 CryptoKey session_key;
325 secret_t connection_secret;
326 bufferlist reply;
327 auto p = payload.cbegin();
328 int r = auth->handle_response(0, p, &session_key, &connection_secret);
329 if (r == -EAGAIN) {
330 auth->prepare_build_request();
331 auth->build_request(reply);
332 logger().info(" responding with {} bytes", reply.length());
333 return {session_key, connection_secret, reply};
334 } else if (r < 0) {
335 logger().error(" handle_response returned {}", r);
336 throw crimson::auth::error("unable to build auth");
337 } else {
338 logger().info("authenticated!");
339 std::terminate();
340 }
341}
342
343tuple<CryptoKey, Connection::secret_t, int>
344Connection::handle_auth_done(uint64_t new_global_id,
345 const ceph::buffer::list& payload)
346{
347 global_id = new_global_id;
348 auth->set_global_id(global_id);
349 auto p = payload.begin();
350 CryptoKey session_key;
351 secret_t connection_secret;
352 int r = auth->handle_response(0, p, &session_key, &connection_secret);
353 conn->set_last_keepalive_ack(auth_start);
f67539c2
TL
354 if (auth_done) {
355 auth_done->set_value(auth_result_t::success);
356 auth_done.reset();
357 }
9f95a23c
TL
358 return {session_key, connection_secret, r};
359}
360
361int Connection::handle_auth_bad_method(uint32_t old_auth_method,
362 int result,
363 const std::vector<uint32_t>& allowed_methods,
364 const std::vector<uint32_t>& allowed_modes)
365{
366 logger().info("old_auth_method {} result {} allowed_methods {}",
367 old_auth_method, cpp_strerror(result), allowed_methods);
368 std::vector<uint32_t> auth_supported;
369 auth_registry.get_supported_methods(conn->get_peer_type(), &auth_supported);
370 auto p = std::find(auth_supported.begin(), auth_supported.end(),
371 old_auth_method);
372 assert(p != auth_supported.end());
373 p = std::find_first_of(std::next(p), auth_supported.end(),
374 allowed_methods.begin(), allowed_methods.end());
375 if (p == auth_supported.end()) {
376 logger().error("server allowed_methods {} but i only support {}",
377 allowed_methods, auth_supported);
f67539c2
TL
378 assert(auth_done);
379 auth_done->set_exception(std::system_error(make_error_code(
9f95a23c
TL
380 crimson::net::error::negotiation_failure)));
381 return -EACCES;
382 }
383 auth_method = *p;
384 logger().info("will try {} next", auth_method);
385 return 0;
386}
387
f67539c2 388void Connection::close()
11fdf7f2 389{
20effc67
TL
390 logger().info("{}", __func__);
391 auth_reply.set_value(Ref<MAuthReply>(nullptr));
392 auth_reply = {};
f67539c2
TL
393 if (auth_done) {
394 auth_done->set_value(auth_result_t::canceled);
395 auth_done.reset();
396 }
11fdf7f2 397 if (conn && !std::exchange(closed, true)) {
f67539c2 398 conn->mark_down();
11fdf7f2
TL
399 }
400}
401
402bool Connection::is_my_peer(const entity_addr_t& addr) const
403{
9f95a23c 404 ceph_assert(conn);
11fdf7f2
TL
405 return conn->get_peer_addr() == addr;
406}
407
9f95a23c 408crimson::net::ConnectionRef Connection::get_conn() {
11fdf7f2
TL
409 return conn;
410}
411
1e59de90
TL
412Client::mon_command_t::mon_command_t(MURef<MMonCommand> req)
413 : req(std::move(req))
20effc67
TL
414{}
415
9f95a23c
TL
416Client::Client(crimson::net::Messenger& messenger,
417 crimson::common::AuthHandler& auth_handler)
11fdf7f2
TL
418 // currently, crimson is OSD-only
419 : want_keys{CEPH_ENTITY_TYPE_MON |
420 CEPH_ENTITY_TYPE_OSD |
421 CEPH_ENTITY_TYPE_MGR},
422 timer{[this] { tick(); }},
9f95a23c 423 msgr{messenger},
20effc67 424 log_client{nullptr},
9f95a23c
TL
425 auth_registry{&cct},
426 auth_handler{auth_handler}
11fdf7f2
TL
427{}
428
429Client::Client(Client&&) = default;
430Client::~Client() = default;
431
432seastar::future<> Client::start() {
9f95a23c
TL
433 entity_name = crimson::common::local_conf()->name;
434 auth_registry.refresh_config();
11fdf7f2 435 return load_keyring().then([this] {
9f95a23c 436 return monmap.build_initial(crimson::common::local_conf(), false);
11fdf7f2
TL
437 }).then([this] {
438 return authenticate();
9f95a23c
TL
439 }).then([this] {
440 auto interval =
441 std::chrono::duration_cast<seastar::lowres_clock::duration>(
442 std::chrono::duration<double>(
443 local_conf().get_val<double>("mon_client_ping_interval")));
444 timer.arm_periodic(interval);
11fdf7f2
TL
445 });
446}
447
448seastar::future<> Client::load_keyring()
449{
9f95a23c 450 if (!auth_registry.is_supported_method(msgr.get_mytype(), CEPH_AUTH_CEPHX)) {
11fdf7f2
TL
451 return seastar::now();
452 } else {
9f95a23c
TL
453 return crimson::auth::load_from_keyring(&keyring).then([](KeyRing* keyring) {
454 return crimson::auth::load_from_keyfile(keyring);
11fdf7f2 455 }).then([](KeyRing* keyring) {
9f95a23c 456 return crimson::auth::load_from_key(keyring);
11fdf7f2
TL
457 }).then([](KeyRing*) {
458 return seastar::now();
459 });
460 }
461}
462
463void Client::tick()
464{
f67539c2 465 gate.dispatch_in_background(__func__, *this, [this] {
9f95a23c 466 if (active_con) {
20effc67 467 return seastar::when_all_succeed(wait_for_send_log(),
1e59de90 468 active_con->get_conn()->send_keepalive(),
9f95a23c 469 active_con->renew_tickets(),
1e59de90 470 active_con->renew_rotating_keyring()).discard_result();
9f95a23c 471 } else {
20effc67
TL
472 assert(is_hunting());
473 logger().info("{} continuing the hunt", __func__);
474 return authenticate();
9f95a23c 475 }
11fdf7f2
TL
476 });
477}
478
20effc67
TL
479seastar::future<> Client::wait_for_send_log() {
480 utime_t now = ceph_clock_now();
481 if (now > last_send_log + cct._conf->mon_client_log_interval) {
482 last_send_log = now;
483 return send_log(log_flushing_t::NO_FLUSH);
484 }
485 return seastar::now();
486}
487
488seastar::future<> Client::send_log(log_flushing_t flush_flag) {
489 if (log_client) {
490 if (auto lm = log_client->get_mon_log_message(flush_flag); lm) {
491 return send_message(std::move(lm));
492 }
493 more_log_pending = log_client->are_pending();
494 }
495 return seastar::now();
496}
497
11fdf7f2
TL
498bool Client::is_hunting() const {
499 return !active_con;
500}
501
f67539c2
TL
502std::optional<seastar::future<>>
503Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
11fdf7f2 504{
f67539c2
TL
505 bool dispatched = true;
506 gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
507 // we only care about these message types
508 switch (m->get_type()) {
509 case CEPH_MSG_MON_MAP:
1e59de90 510 return handle_monmap(*conn, boost::static_pointer_cast<MMonMap>(m));
f67539c2
TL
511 case CEPH_MSG_AUTH_REPLY:
512 return handle_auth_reply(
1e59de90 513 *conn, boost::static_pointer_cast<MAuthReply>(m));
f67539c2
TL
514 case CEPH_MSG_MON_SUBSCRIBE_ACK:
515 return handle_subscribe_ack(
516 boost::static_pointer_cast<MMonSubscribeAck>(m));
517 case CEPH_MSG_MON_GET_VERSION_REPLY:
518 return handle_get_version_reply(
519 boost::static_pointer_cast<MMonGetVersionReply>(m));
520 case MSG_MON_COMMAND_ACK:
521 return handle_mon_command_ack(
522 boost::static_pointer_cast<MMonCommandAck>(m));
523 case MSG_LOGACK:
524 return handle_log_ack(
525 boost::static_pointer_cast<MLogAck>(m));
526 case MSG_CONFIG:
527 return handle_config(
528 boost::static_pointer_cast<MConfig>(m));
529 default:
530 dispatched = false;
531 return seastar::now();
532 }
533 });
534 return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
11fdf7f2
TL
535}
536
f67539c2 537void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool /* is_replace */)
11fdf7f2 538{
f67539c2
TL
539 gate.dispatch_in_background(__func__, *this, [this, conn] {
540 auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
541 [peer_addr = conn->get_peer_addr()](auto& mc) {
542 return mc->is_my_peer(peer_addr);
543 });
544 if (found != pending_conns.end()) {
545 logger().warn("pending conn reset by {}", conn->get_peer_addr());
546 (*found)->close();
20effc67 547 pending_conns.erase(found);
f67539c2
TL
548 return seastar::now();
549 } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) {
550 logger().warn("active conn reset {}", conn->get_peer_addr());
20effc67
TL
551 return reopen_session(-1).then([this](bool opened) {
552 if (opened) {
553 return on_session_opened();
554 } else {
555 return seastar::now();
556 }
f67539c2
TL
557 });
558 } else {
559 return seastar::now();
560 }
561 });
11fdf7f2
TL
562}
563
9f95a23c
TL
564std::pair<std::vector<uint32_t>, std::vector<uint32_t>>
565Client::get_supported_auth_methods(int peer_type)
566{
567 std::vector<uint32_t> methods;
568 std::vector<uint32_t> modes;
569 auth_registry.get_supported_methods(peer_type, &methods, &modes);
570 return {methods, modes};
571}
572
573uint32_t Client::pick_con_mode(int peer_type,
574 uint32_t auth_method,
575 const std::vector<uint32_t>& preferred_modes)
576{
577 return auth_registry.pick_mode(peer_type, auth_method, preferred_modes);
578}
579
580AuthAuthorizeHandler* Client::get_auth_authorize_handler(int peer_type,
581 int auth_method)
582{
583 return auth_registry.get_handler(peer_type, auth_method);
584}
585
586
1e59de90
TL
587int Client::handle_auth_request(crimson::net::Connection &conn,
588 AuthConnectionMeta &auth_meta,
9f95a23c
TL
589 bool more,
590 uint32_t auth_method,
591 const ceph::bufferlist& payload,
1e59de90 592 uint64_t *p_peer_global_id,
9f95a23c
TL
593 ceph::bufferlist *reply)
594{
9f95a23c 595 if (payload.length() == 0) {
1e59de90 596 return -EACCES;
9f95a23c 597 }
1e59de90
TL
598 auth_meta.auth_mode = payload[0];
599 if (auth_meta.auth_mode < AUTH_MODE_AUTHORIZER ||
600 auth_meta.auth_mode > AUTH_MODE_AUTHORIZER_MAX) {
9f95a23c
TL
601 return -EACCES;
602 }
1e59de90 603 AuthAuthorizeHandler* ah = get_auth_authorize_handler(conn.get_peer_type(),
9f95a23c
TL
604 auth_method);
605 if (!ah) {
606 logger().error("no AuthAuthorizeHandler found for auth method: {}",
607 auth_method);
608 return -EOPNOTSUPP;
609 }
1e59de90
TL
610 auto authorizer_challenge = &auth_meta.authorizer_challenge;
611 if (auth_meta.skip_authorizer_challenge) {
612 logger().info("skipping challenge on {}", conn);
9f95a23c
TL
613 authorizer_challenge = nullptr;
614 }
20effc67
TL
615 if (!active_con) {
616 logger().info("auth request during inactivity period");
617 // let's instruct the client to come back later
618 return -EBUSY;
619 }
1e59de90 620 bool was_challenge = (bool)auth_meta.authorizer_challenge;
9f95a23c
TL
621 EntityName name;
622 AuthCapsInfo caps_info;
623 bool is_valid = ah->verify_authorizer(
624 &cct,
625 active_con->get_keys(),
626 payload,
1e59de90 627 auth_meta.get_connection_secret_length(),
9f95a23c
TL
628 reply,
629 &name,
1e59de90 630 p_peer_global_id,
9f95a23c 631 &caps_info,
1e59de90
TL
632 &auth_meta.session_key,
633 &auth_meta.connection_secret,
9f95a23c
TL
634 authorizer_challenge);
635 if (is_valid) {
636 auth_handler.handle_authentication(name, caps_info);
637 return 1;
638 }
1e59de90
TL
639 if (!more && !was_challenge && auth_meta.authorizer_challenge) {
640 logger().info("added challenge on {}", conn);
9f95a23c
TL
641 return 0;
642 } else {
1e59de90 643 logger().info("bad authorizer on {}", conn);
9f95a23c
TL
644 return -EACCES;
645 }
646}
647
648auth::AuthClient::auth_request_t
1e59de90
TL
649Client::get_auth_request(crimson::net::Connection &conn,
650 AuthConnectionMeta &auth_meta)
9f95a23c 651{
1e59de90
TL
652 logger().info("get_auth_request(conn={}, auth_method={})",
653 conn, auth_meta.auth_method);
9f95a23c 654 // connection to mon?
1e59de90 655 if (conn.get_peer_type() == CEPH_ENTITY_TYPE_MON) {
9f95a23c 656 auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
1e59de90 657 [peer_addr = conn.get_peer_addr()](auto& mc) {
9f95a23c
TL
658 return mc->is_my_peer(peer_addr);
659 });
660 if (found == pending_conns.end()) {
661 throw crimson::auth::error{"unknown connection"};
662 }
663 return (*found)->get_auth_request(entity_name, want_keys);
664 } else {
665 // generate authorizer
666 if (!active_con) {
667 logger().error(" but no auth handler is set up");
668 throw crimson::auth::error("no auth available");
669 }
1e59de90 670 auto authorizer = active_con->get_authorizer(conn.get_peer_type());
9f95a23c
TL
671 if (!authorizer) {
672 logger().error("failed to build_authorizer for type {}",
1e59de90 673 ceph_entity_type_name(conn.get_peer_type()));
9f95a23c
TL
674 throw crimson::auth::error("unable to build auth");
675 }
1e59de90
TL
676 auth_meta.authorizer.reset(authorizer);
677 auth_meta.auth_method = authorizer->protocol;
9f95a23c 678 vector<uint32_t> modes;
1e59de90
TL
679 auth_registry.get_supported_modes(conn.get_peer_type(),
680 auth_meta.auth_method,
9f95a23c
TL
681 &modes);
682 return {authorizer->protocol, modes, authorizer->bl};
683 }
684}
685
1e59de90
TL
686ceph::bufferlist Client::handle_auth_reply_more(crimson::net::Connection &conn,
687 AuthConnectionMeta &auth_meta,
9f95a23c
TL
688 const bufferlist& bl)
689{
1e59de90 690 if (conn.get_peer_type() == CEPH_ENTITY_TYPE_MON) {
9f95a23c 691 auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
1e59de90 692 [peer_addr = conn.get_peer_addr()](auto& mc) {
9f95a23c
TL
693 return mc->is_my_peer(peer_addr);
694 });
695 if (found == pending_conns.end()) {
696 throw crimson::auth::error{"unknown connection"};
697 }
698 bufferlist reply;
1e59de90 699 tie(auth_meta.session_key, auth_meta.connection_secret, reply) =
9f95a23c
TL
700 (*found)->handle_auth_reply_more(bl);
701 return reply;
702 } else {
703 // authorizer challenges
1e59de90 704 if (!active_con || !auth_meta.authorizer) {
9f95a23c
TL
705 logger().error("no authorizer?");
706 throw crimson::auth::error("no auth available");
707 }
1e59de90
TL
708 auth_meta.authorizer->add_challenge(&cct, bl);
709 return auth_meta.authorizer->bl;
9f95a23c
TL
710 }
711}
712
1e59de90
TL
713int Client::handle_auth_done(crimson::net::Connection &conn,
714 AuthConnectionMeta &auth_meta,
9f95a23c 715 uint64_t global_id,
20effc67 716 uint32_t /*con_mode*/,
9f95a23c
TL
717 const bufferlist& bl)
718{
1e59de90 719 if (conn.get_peer_type() == CEPH_ENTITY_TYPE_MON) {
9f95a23c 720 auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
1e59de90 721 [peer_addr = conn.get_peer_addr()](auto& mc) {
9f95a23c
TL
722 return mc->is_my_peer(peer_addr);
723 });
724 if (found == pending_conns.end()) {
725 return -ENOENT;
726 }
727 int r = 0;
1e59de90 728 tie(auth_meta.session_key, auth_meta.connection_secret, r) =
9f95a23c
TL
729 (*found)->handle_auth_done(global_id, bl);
730 return r;
731 } else {
732 // verify authorizer reply
733 auto p = bl.begin();
1e59de90 734 if (!auth_meta.authorizer->verify_reply(p, &auth_meta.connection_secret)) {
9f95a23c
TL
735 logger().error("failed verifying authorizer reply");
736 return -EACCES;
737 }
1e59de90 738 auth_meta.session_key = auth_meta.authorizer->session_key;
9f95a23c
TL
739 return 0;
740 }
741}
742
743 // Handle server's indication that the previous auth attempt failed
1e59de90
TL
744int Client::handle_auth_bad_method(crimson::net::Connection &conn,
745 AuthConnectionMeta &auth_meta,
9f95a23c
TL
746 uint32_t old_auth_method,
747 int result,
748 const std::vector<uint32_t>& allowed_methods,
749 const std::vector<uint32_t>& allowed_modes)
750{
1e59de90 751 if (conn.get_peer_type() == CEPH_ENTITY_TYPE_MON) {
9f95a23c 752 auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
1e59de90 753 [peer_addr = conn.get_peer_addr()](auto& mc) {
9f95a23c
TL
754 return mc->is_my_peer(peer_addr);
755 });
756 if (found != pending_conns.end()) {
757 return (*found)->handle_auth_bad_method(
758 old_auth_method, result,
759 allowed_methods, allowed_modes);
760 } else {
761 return -ENOENT;
762 }
763 } else {
764 // huh...
765 logger().info("hmm, they didn't like {} result {}",
766 old_auth_method, cpp_strerror(result));
767 return -EACCES;
768 }
769}
770
1e59de90 771seastar::future<> Client::handle_monmap(crimson::net::Connection &conn,
11fdf7f2
TL
772 Ref<MMonMap> m)
773{
774 monmap.decode(m->monmapbl);
1e59de90 775 const auto peer_addr = conn.get_peer_addr();
11fdf7f2
TL
776 auto cur_mon = monmap.get_name(peer_addr);
777 logger().info("got monmap {}, mon.{}, is now rank {}",
778 monmap.epoch, cur_mon, monmap.get_rank(cur_mon));
779 sub.got("monmap", monmap.get_epoch());
780
781 if (monmap.get_addr_name(peer_addr, cur_mon)) {
9f95a23c
TL
782 if (active_con) {
783 logger().info("handle_monmap: renewing tickets");
784 return seastar::when_all_succeed(
785 active_con->renew_tickets(),
1e59de90 786 active_con->renew_rotating_keyring()).then_unpack([] {
9f95a23c
TL
787 logger().info("handle_mon_map: renewed tickets");
788 });
789 } else {
790 return seastar::now();
791 }
11fdf7f2
TL
792 } else {
793 logger().warn("mon.{} went away", cur_mon);
20effc67
TL
794 return reopen_session(-1).then([this](bool opened) {
795 if (opened) {
796 return on_session_opened();
797 } else {
798 return seastar::now();
799 }
f67539c2 800 });
11fdf7f2
TL
801 }
802}
803
1e59de90 804seastar::future<> Client::handle_auth_reply(crimson::net::Connection &conn,
f67539c2 805 Ref<MAuthReply> m)
11fdf7f2 806{
1e59de90
TL
807 logger().info("handle_auth_reply {} returns {}: {}",
808 conn, *m, m->result);
11fdf7f2 809 auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
1e59de90 810 [peer_addr = conn.get_peer_addr()](auto& mc) {
9f95a23c 811 return mc->is_my_peer(peer_addr);
11fdf7f2
TL
812 });
813 if (found != pending_conns.end()) {
9f95a23c 814 return (*found)->handle_auth_reply(m);
11fdf7f2 815 } else if (active_con) {
1e59de90
TL
816 return active_con->handle_auth_reply(m).then([this] {
817 return seastar::when_all_succeed(
818 active_con->renew_rotating_keyring(),
819 active_con->renew_tickets()).discard_result();
820 });
11fdf7f2 821 } else {
1e59de90 822 logger().error("unknown auth reply from {}", conn.get_peer_addr());
11fdf7f2
TL
823 return seastar::now();
824 }
825}
826
827seastar::future<> Client::handle_subscribe_ack(Ref<MMonSubscribeAck> m)
828{
829 sub.acked(m->interval);
830 return seastar::now();
831}
832
833Client::get_version_t Client::get_version(const std::string& map)
834{
20effc67 835 auto m = crimson::make_message<MMonGetVersion>();
11fdf7f2
TL
836 auto tid = ++last_version_req_id;
837 m->handle = tid;
838 m->what = map;
839 auto& req = version_reqs[tid];
20effc67 840 return send_message(std::move(m)).then([&req] {
11fdf7f2
TL
841 return req.get_future();
842 });
843}
844
845seastar::future<>
846Client::handle_get_version_reply(Ref<MMonGetVersionReply> m)
847{
848 if (auto found = version_reqs.find(m->handle);
849 found != version_reqs.end()) {
850 auto& result = found->second;
851 logger().trace("{}: {} returns {}",
852 __func__, m->handle, m->version);
f67539c2 853 result.set_value(std::make_tuple(m->version, m->oldest_version));
11fdf7f2
TL
854 version_reqs.erase(found);
855 } else {
856 logger().warn("{}: version request with handle {} not found",
857 __func__, m->handle);
858 }
859 return seastar::now();
860}
861
862seastar::future<> Client::handle_mon_command_ack(Ref<MMonCommandAck> m)
863{
864 const auto tid = m->get_tid();
20effc67
TL
865 if (auto found = std::find_if(mon_commands.begin(),
866 mon_commands.end(),
867 [tid](auto& cmd) {
868 return cmd.req->get_tid() == tid;
869 });
11fdf7f2 870 found != mon_commands.end()) {
20effc67 871 auto& command = *found;
11fdf7f2 872 logger().trace("{} {}", __func__, tid);
20effc67 873 command.result.set_value(std::make_tuple(m->r, m->rs, std::move(m->get_data())));
11fdf7f2
TL
874 mon_commands.erase(found);
875 } else {
876 logger().warn("{} {} not found", __func__, tid);
877 }
878 return seastar::now();
879}
880
881seastar::future<> Client::handle_log_ack(Ref<MLogAck> m)
882{
20effc67
TL
883 if (log_client) {
884 return log_client->handle_log_ack(m).then([this] {
885 if (more_log_pending) {
886 return send_log(log_flushing_t::NO_FLUSH);
887 } else {
888 return seastar::now();
889 }
890 });
891 }
11fdf7f2
TL
892 return seastar::now();
893}
894
895seastar::future<> Client::handle_config(Ref<MConfig> m)
896{
20effc67
TL
897 return crimson::common::local_conf().set_mon_vals(m->config).then([this] {
898 if (config_updated) {
899 config_updated->set_value();
900 }
901 });
11fdf7f2
TL
902}
903
904std::vector<unsigned> Client::get_random_mons(unsigned n) const
905{
906 uint16_t min_priority = std::numeric_limits<uint16_t>::max();
907 for (const auto& m : monmap.mon_info) {
908 if (m.second.priority < min_priority) {
909 min_priority = m.second.priority;
910 }
911 }
912 vector<unsigned> ranks;
913 for (auto [name, info] : monmap.mon_info) {
11fdf7f2
TL
914 if (info.priority == min_priority) {
915 ranks.push_back(monmap.get_rank(name));
916 }
917 }
918 std::random_device rd;
919 std::default_random_engine rng{rd()};
920 std::shuffle(ranks.begin(), ranks.end(), rng);
921 if (n == 0 || n > ranks.size()) {
922 return ranks;
923 } else {
924 return {ranks.begin(), ranks.begin() + n};
925 }
926}
927
928seastar::future<> Client::authenticate()
929{
20effc67
TL
930 return reopen_session(-1).then([this](bool opened) {
931 if (opened) {
932 return on_session_opened();
933 } else {
934 return seastar::now();
935 }
f67539c2 936 });
11fdf7f2
TL
937}
938
939seastar::future<> Client::stop()
940{
f67539c2
TL
941 logger().info("{}", __func__);
942 auto fut = gate.close();
943 timer.cancel();
20effc67 944 ready_to_send = false;
f67539c2
TL
945 for (auto& pending_con : pending_conns) {
946 pending_con->close();
947 }
948 if (active_con) {
949 active_con->close();
950 }
951 return fut;
11fdf7f2
TL
952}
953
20effc67
TL
954static entity_addr_t choose_client_addr(
955 const entity_addrvec_t& my_addrs,
956 const entity_addrvec_t& client_addrs)
957{
958 // here is where we decide which of the addrs to connect to. always prefer
959 // the first one, if we support it.
960 for (const auto& a : client_addrs.v) {
961 if (a.is_msgr2()) {
962 // FIXME: for ipv4 vs ipv6, check whether local host can handle ipv6 before
963 // trying it? for now, just pick whichever is listed first.
964 return a;
965 }
966 }
967 return entity_addr_t{};
968}
969
970seastar::future<bool> Client::reopen_session(int rank)
11fdf7f2 971{
9f95a23c 972 logger().info("{} to mon.{}", __func__, rank);
20effc67
TL
973 ready_to_send = false;
974 if (active_con) {
975 active_con->close();
976 active_con = nullptr;
977 ceph_assert(pending_conns.empty());
978 } else {
979 for (auto& pending_con : pending_conns) {
980 pending_con->close();
981 }
982 pending_conns.clear();
983 }
11fdf7f2
TL
984 vector<unsigned> mons;
985 if (rank >= 0) {
986 mons.push_back(rank);
987 } else {
988 const auto parallel =
9f95a23c 989 crimson::common::local_conf().get_val<uint64_t>("mon_client_hunt_parallel");
11fdf7f2
TL
990 mons = get_random_mons(parallel);
991 }
992 pending_conns.reserve(mons.size());
993 return seastar::parallel_for_each(mons, [this](auto rank) {
20effc67
TL
994 auto peer = choose_client_addr(msgr.get_myaddrs(),
995 monmap.get_addrs(rank));
f67539c2
TL
996 if (peer == entity_addr_t{}) {
997 // crimson msgr only uses the first bound addr
998 logger().warn("mon.{} does not have an addr compatible with me", rank);
999 return seastar::now();
1000 }
11fdf7f2 1001 logger().info("connecting to mon.{}", rank);
20effc67
TL
1002 auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON);
1003 auto& mc = pending_conns.emplace_back(
1004 seastar::make_shared<Connection>(auth_registry, conn, &keyring));
1005 assert(conn->get_peer_addr().is_msgr2());
1006 return mc->authenticate_v2().then([peer, this](auto result) {
f67539c2
TL
1007 if (result == Connection::auth_result_t::success) {
1008 _finish_auth(peer);
9f95a23c 1009 }
9f95a23c
TL
1010 logger().debug("reopen_session mon connection attempts complete");
1011 }).handle_exception([](auto ep) {
1012 logger().error("mon connections failed with ep {}", ep);
1013 return seastar::make_exception_future(ep);
11fdf7f2
TL
1014 });
1015 }).then([this] {
20effc67
TL
1016 if (active_con) {
1017 return true;
1018 } else {
1019 logger().warn("cannot establish the active_con with any mon");
1020 return false;
f67539c2 1021 }
11fdf7f2
TL
1022 });
1023}
1024
f67539c2
TL
1025void Client::_finish_auth(const entity_addr_t& peer)
1026{
1027 if (!is_hunting()) {
1028 return;
1029 }
1030 logger().info("found mon.{}", monmap.get_name(peer));
1031
1032 auto found = std::find_if(
1033 pending_conns.begin(), pending_conns.end(),
1034 [peer](auto& conn) {
1035 return conn->is_my_peer(peer);
1036 });
1037 if (found == pending_conns.end()) {
1038 // Happens if another connection has won the race
1039 ceph_assert(active_con && pending_conns.empty());
1040 logger().info("no pending connection for mon.{}, peer {}",
1041 monmap.get_name(peer), peer);
1042 return;
1043 }
1044
1045 ceph_assert(!active_con && !pending_conns.empty());
20effc67
TL
1046 // It's too early to toggle the `ready_to_send` flag. It will
1047 // be set atfer finishing the MAuth exchange and draining out
1048 // the `pending_messages` queue.
f67539c2 1049 active_con = std::move(*found);
20effc67 1050 *found = nullptr;
f67539c2
TL
1051 for (auto& conn : pending_conns) {
1052 if (conn) {
1053 conn->close();
1054 }
1055 }
1056 pending_conns.clear();
1057}
1058
11fdf7f2 1059Client::command_result_t
20effc67
TL
1060Client::run_command(std::string&& cmd,
1061 bufferlist&& bl)
11fdf7f2 1062{
20effc67 1063 auto m = crimson::make_message<MMonCommand>(monmap.fsid);
11fdf7f2
TL
1064 auto tid = ++last_mon_command_id;
1065 m->set_tid(tid);
20effc67
TL
1066 m->cmd = {std::move(cmd)};
1067 m->set_data(std::move(bl));
1e59de90 1068 auto& command = mon_commands.emplace_back(crimson::make_message<MMonCommand>(*m));
20effc67
TL
1069 return send_message(std::move(m)).then([&result=command.result] {
1070 return result.get_future();
11fdf7f2
TL
1071 });
1072}
1073
20effc67 1074seastar::future<> Client::send_message(MessageURef m)
11fdf7f2 1075{
20effc67
TL
1076 if (active_con && ready_to_send) {
1077 assert(pending_messages.empty());
1078 return active_con->get_conn()->send(std::move(m));
1079 } else {
1080 auto& delayed = pending_messages.emplace_back(std::move(m));
1081 return delayed.pr.get_future();
f67539c2 1082 }
f67539c2
TL
1083}
1084
20effc67 1085seastar::future<> Client::on_session_opened()
f67539c2 1086{
20effc67
TL
1087 return active_con->renew_rotating_keyring().then([this] {
1088 if (!active_con) {
1089 // the connection can be closed even in the middle of the opening sequence
1090 logger().info("on_session_opened {}: connection closed", __LINE__);
1091 return seastar::now();
1092 }
f67539c2 1093 for (auto& m : pending_messages) {
20effc67 1094 (void) active_con->get_conn()->send(std::move(m.msg));
f67539c2
TL
1095 m.pr.set_value();
1096 }
1097 pending_messages.clear();
20effc67
TL
1098 ready_to_send = true;
1099 return sub.reload() ? renew_subs() : seastar::now();
1100 }).then([this] {
1101 if (!active_con) {
1102 logger().info("on_session_opened {}: connection closed", __LINE__);
1103 return seastar::now();
1104 }
1105 return seastar::parallel_for_each(mon_commands,
1106 [this](auto &command) {
1107 return send_message(crimson::make_message<MMonCommand>(*command.req));
1108 });
1109 });
11fdf7f2
TL
1110}
1111
1112bool Client::sub_want(const std::string& what, version_t start, unsigned flags)
1113{
1114 return sub.want(what, start, flags);
1115}
1116
1117void Client::sub_got(const std::string& what, version_t have)
1118{
1119 sub.got(what, have);
1120}
1121
1122void Client::sub_unwant(const std::string& what)
1123{
1124 sub.unwant(what);
1125}
1126
1127bool Client::sub_want_increment(const std::string& what,
1128 version_t start,
1129 unsigned flags)
1130{
1131 return sub.inc_want(what, start, flags);
1132}
1133
1134seastar::future<> Client::renew_subs()
1135{
1136 if (!sub.have_new()) {
1137 logger().warn("{} - empty", __func__);
1138 return seastar::now();
1139 }
1140 logger().trace("{}", __func__);
1141
20effc67 1142 auto m = crimson::make_message<MMonSubscribe>();
11fdf7f2
TL
1143 m->what = sub.get_subs();
1144 m->hostname = ceph_get_short_hostname();
20effc67 1145 return send_message(std::move(m)).then([this] {
11fdf7f2
TL
1146 sub.renewed();
1147 });
1148}
1149
20effc67
TL
1150seastar::future<> Client::wait_for_config()
1151{
1152 assert(!config_updated);
1153 config_updated = seastar::promise<>();
1154 return config_updated->get_future();
1155}
1156
f67539c2
TL
1157void Client::print(std::ostream& out) const
1158{
1159 out << "mon." << entity_name;
1160}
1161
9f95a23c 1162} // namespace crimson::mon