]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
11fdf7f2 TL |
4 | #include "MonClient.h" |
5 | ||
6 | #include <random> | |
1e59de90 | 7 | #include <fmt/ranges.h> |
11fdf7f2 TL |
8 | #include <seastar/core/future-util.hh> |
9 | #include <seastar/core/lowres_clock.hh> | |
9f95a23c | 10 | #include <seastar/core/shared_future.hh> |
11fdf7f2 TL |
11 | #include <seastar/util/log.hh> |
12 | ||
13 | #include "auth/AuthClientHandler.h" | |
11fdf7f2 TL |
14 | #include "auth/RotatingKeyRing.h" |
15 | ||
16 | #include "common/hostname.h" | |
17 | ||
18 | #include "crimson/auth/KeyRing.h" | |
19 | #include "crimson/common/config_proxy.h" | |
20 | #include "crimson/common/log.h" | |
20effc67 | 21 | #include "crimson/common/logclient.h" |
11fdf7f2 TL |
22 | #include "crimson/net/Connection.h" |
23 | #include "crimson/net/Errors.h" | |
24 | #include "crimson/net/Messenger.h" | |
25 | ||
26 | #include "messages/MAuth.h" | |
27 | #include "messages/MAuthReply.h" | |
28 | #include "messages/MConfig.h" | |
29 | #include "messages/MLogAck.h" | |
30 | #include "messages/MMonCommand.h" | |
31 | #include "messages/MMonCommandAck.h" | |
9f95a23c | 32 | #include "messages/MMonGetMap.h" |
11fdf7f2 TL |
33 | #include "messages/MMonGetVersion.h" |
34 | #include "messages/MMonGetVersionReply.h" | |
35 | #include "messages/MMonMap.h" | |
36 | #include "messages/MMonSubscribe.h" | |
37 | #include "messages/MMonSubscribeAck.h" | |
38 | ||
20effc67 TL |
39 | using std::string; |
40 | using std::tuple; | |
41 | using std::vector; | |
42 | ||
11fdf7f2 TL |
43 | namespace { |
44 | seastar::logger& logger() | |
45 | { | |
9f95a23c | 46 | return crimson::get_logger(ceph_subsys_monc); |
11fdf7f2 TL |
47 | } |
48 | } | |
49 | ||
9f95a23c | 50 | namespace crimson::mon { |
11fdf7f2 | 51 | |
9f95a23c | 52 | using crimson::common::local_conf; |
11fdf7f2 | 53 | |
20effc67 | 54 | class Connection : public seastar::enable_shared_from_this<Connection> { |
11fdf7f2 | 55 | public: |
9f95a23c TL |
56 | Connection(const AuthRegistry& auth_registry, |
57 | crimson::net::ConnectionRef conn, | |
58 | KeyRing* keyring); | |
f67539c2 | 59 | enum class auth_result_t { |
9f95a23c TL |
60 | success = 0, |
61 | failure, | |
62 | canceled | |
63 | }; | |
11fdf7f2 | 64 | seastar::future<> handle_auth_reply(Ref<MAuthReply> m); |
9f95a23c | 65 | // v2 |
f67539c2 | 66 | seastar::future<auth_result_t> authenticate_v2(); |
9f95a23c TL |
67 | auth::AuthClient::auth_request_t |
68 | get_auth_request(const EntityName& name, | |
69 | uint32_t want_keys); | |
70 | using secret_t = string; | |
71 | tuple<CryptoKey, secret_t, bufferlist> | |
72 | handle_auth_reply_more(const ceph::buffer::list& bl); | |
9f95a23c TL |
73 | int handle_auth_bad_method(uint32_t old_auth_method, |
74 | int result, | |
75 | const std::vector<uint32_t>& allowed_methods, | |
76 | const std::vector<uint32_t>& allowed_modes); | |
f67539c2 TL |
77 | tuple<CryptoKey, secret_t, int> |
78 | handle_auth_done(uint64_t new_global_id, | |
79 | const ceph::buffer::list& bl); | |
80 | void close(); | |
11fdf7f2 | 81 | bool is_my_peer(const entity_addr_t& addr) const; |
9f95a23c TL |
82 | AuthAuthorizer* get_authorizer(entity_type_t peer) const; |
83 | KeyStore& get_keys(); | |
11fdf7f2 | 84 | seastar::future<> renew_tickets(); |
9f95a23c TL |
85 | seastar::future<> renew_rotating_keyring(); |
86 | ||
87 | crimson::net::ConnectionRef get_conn(); | |
11fdf7f2 TL |
88 | |
89 | private: | |
9f95a23c TL |
90 | std::unique_ptr<AuthClientHandler> create_auth(crimson::auth::method_t, |
91 | uint64_t global_id, | |
92 | const EntityName& name, | |
93 | uint32_t want_keys); | |
94 | enum class request_t { | |
95 | rotating, | |
96 | general, | |
97 | }; | |
f67539c2 TL |
98 | seastar::future<std::optional<auth_result_t>> do_auth_single(request_t); |
99 | seastar::future<auth_result_t> do_auth(request_t); | |
11fdf7f2 TL |
100 | |
101 | private: | |
102 | bool closed = false; | |
20effc67 | 103 | seastar::shared_promise<Ref<MAuthReply>> auth_reply; |
9f95a23c TL |
104 | // v2 |
105 | using clock_t = seastar::lowres_system_clock; | |
106 | clock_t::time_point auth_start; | |
107 | crimson::auth::method_t auth_method = 0; | |
f67539c2 | 108 | std::optional<seastar::promise<auth_result_t>> auth_done; |
9f95a23c TL |
109 | const AuthRegistry& auth_registry; |
110 | crimson::net::ConnectionRef conn; | |
11fdf7f2 | 111 | std::unique_ptr<AuthClientHandler> auth; |
9f95a23c TL |
112 | std::unique_ptr<RotatingKeyRing> rotating_keyring; |
113 | uint64_t global_id = 0; | |
114 | clock_t::time_point last_rotating_renew_sent; | |
11fdf7f2 TL |
115 | }; |
116 | ||
9f95a23c TL |
117 | Connection::Connection(const AuthRegistry& auth_registry, |
118 | crimson::net::ConnectionRef conn, | |
11fdf7f2 | 119 | KeyRing* keyring) |
9f95a23c TL |
120 | : auth_registry{auth_registry}, |
121 | conn{conn}, | |
122 | rotating_keyring{ | |
123 | std::make_unique<RotatingKeyRing>(nullptr, | |
124 | CEPH_ENTITY_TYPE_OSD, | |
125 | keyring)} | |
11fdf7f2 TL |
126 | {} |
127 | ||
128 | seastar::future<> Connection::handle_auth_reply(Ref<MAuthReply> m) | |
129 | { | |
20effc67 TL |
130 | logger().info("{}", __func__); |
131 | ceph_assert(m); | |
132 | auth_reply.set_value(m); | |
133 | auth_reply = {}; | |
11fdf7f2 TL |
134 | return seastar::now(); |
135 | } | |
136 | ||
137 | seastar::future<> Connection::renew_tickets() | |
138 | { | |
139 | if (auth->need_tickets()) { | |
1e59de90 | 140 | logger().info("{}: retrieving new tickets", __func__); |
20effc67 TL |
141 | return do_auth(request_t::general).then([](const auth_result_t r) { |
142 | if (r == auth_result_t::failure) { | |
1e59de90 | 143 | logger().info("renew_tickets: ignoring failed auth reply"); |
11fdf7f2 TL |
144 | } |
145 | }); | |
1e59de90 TL |
146 | } else { |
147 | logger().debug("{}: don't need new tickets", __func__); | |
148 | return seastar::now(); | |
11fdf7f2 | 149 | } |
11fdf7f2 TL |
150 | } |
151 | ||
9f95a23c TL |
152 | seastar::future<> Connection::renew_rotating_keyring() |
153 | { | |
154 | auto now = clock_t::now(); | |
155 | auto ttl = std::chrono::seconds{ | |
156 | static_cast<long>(crimson::common::local_conf()->auth_service_ticket_ttl)}; | |
1e59de90 TL |
157 | auto cutoff = utime_t{now - std::min(std::chrono::seconds{30}, ttl / 4)}; |
158 | if (!rotating_keyring->need_new_secrets(cutoff)) { | |
159 | logger().debug("renew_rotating_keyring secrets are up-to-date " | |
160 | "(they expire after {})", cutoff); | |
9f95a23c | 161 | return seastar::now(); |
1e59de90 TL |
162 | } else { |
163 | logger().info("renew_rotating_keyring renewing rotating keys " | |
164 | " (they expired before {})", cutoff); | |
9f95a23c | 165 | } |
1e59de90 TL |
166 | if ((now > last_rotating_renew_sent) && |
167 | (now - last_rotating_renew_sent < std::chrono::seconds{1})) { | |
168 | logger().info("renew_rotating_keyring called too often (last: {})", | |
169 | utime_t{last_rotating_renew_sent}); | |
9f95a23c TL |
170 | return seastar::now(); |
171 | } | |
172 | last_rotating_renew_sent = now; | |
20effc67 TL |
173 | return do_auth(request_t::rotating).then([](const auth_result_t r) { |
174 | if (r == auth_result_t::failure) { | |
1e59de90 | 175 | logger().info("renew_rotating_keyring: ignoring failed auth reply"); |
9f95a23c TL |
176 | } |
177 | }); | |
178 | } | |
179 | ||
180 | AuthAuthorizer* Connection::get_authorizer(entity_type_t peer) const | |
181 | { | |
182 | if (auth) { | |
183 | return auth->build_authorizer(peer); | |
184 | } else { | |
185 | return nullptr; | |
186 | } | |
187 | } | |
188 | ||
189 | KeyStore& Connection::get_keys() { | |
190 | return *rotating_keyring; | |
191 | } | |
192 | ||
11fdf7f2 | 193 | std::unique_ptr<AuthClientHandler> |
9f95a23c TL |
194 | Connection::create_auth(crimson::auth::method_t protocol, |
195 | uint64_t global_id, | |
11fdf7f2 TL |
196 | const EntityName& name, |
197 | uint32_t want_keys) | |
198 | { | |
9f95a23c | 199 | static crimson::common::CephContext cct; |
11fdf7f2 TL |
200 | std::unique_ptr<AuthClientHandler> auth; |
201 | auth.reset(AuthClientHandler::create(&cct, | |
9f95a23c TL |
202 | protocol, |
203 | rotating_keyring.get())); | |
11fdf7f2 | 204 | if (!auth) { |
9f95a23c | 205 | logger().error("no handler for protocol {}", protocol); |
11fdf7f2 | 206 | throw std::system_error(make_error_code( |
9f95a23c | 207 | crimson::net::error::negotiation_failure)); |
11fdf7f2 TL |
208 | } |
209 | auth->init(name); | |
210 | auth->set_want_keys(want_keys); | |
211 | auth->set_global_id(global_id); | |
11fdf7f2 TL |
212 | return auth; |
213 | } | |
214 | ||
f67539c2 | 215 | seastar::future<std::optional<Connection::auth_result_t>> |
9f95a23c | 216 | Connection::do_auth_single(Connection::request_t what) |
11fdf7f2 | 217 | { |
20effc67 | 218 | auto m = crimson::make_message<MAuth>(); |
11fdf7f2 TL |
219 | m->protocol = auth->get_protocol(); |
220 | auth->prepare_build_request(); | |
9f95a23c TL |
221 | switch (what) { |
222 | case request_t::rotating: | |
223 | auth->build_rotating_request(m->auth_payload); | |
224 | break; | |
225 | case request_t::general: | |
226 | if (int ret = auth->build_request(m->auth_payload); ret) { | |
227 | logger().error("missing/bad key for '{}'", local_conf()->name); | |
228 | throw std::system_error(make_error_code( | |
229 | crimson::net::error::negotiation_failure)); | |
230 | } | |
231 | break; | |
232 | default: | |
233 | assert(0); | |
11fdf7f2 TL |
234 | } |
235 | logger().info("sending {}", *m); | |
20effc67 | 236 | return conn->send(std::move(m)).then([this] { |
11fdf7f2 | 237 | logger().info("waiting"); |
20effc67 TL |
238 | return auth_reply.get_shared_future(); |
239 | }).then([this, life_extender=shared_from_this()] (Ref<MAuthReply> m) { | |
9f95a23c TL |
240 | if (!m) { |
241 | ceph_assert(closed); | |
20effc67 TL |
242 | logger().info("do_auth_single: connection closed"); |
243 | return std::make_optional(auth_result_t::canceled); | |
9f95a23c | 244 | } |
1e59de90 TL |
245 | logger().info("do_auth_single: {} returns {}: {}", |
246 | *conn, *m, m->result); | |
11fdf7f2 TL |
247 | auto p = m->result_bl.cbegin(); |
248 | auto ret = auth->handle_response(m->result, p, | |
11fdf7f2 | 249 | nullptr, nullptr); |
20effc67 TL |
250 | std::optional<Connection::auth_result_t> auth_result; |
251 | switch (ret) { | |
252 | case -EAGAIN: | |
253 | auth_result = std::nullopt; | |
254 | break; | |
255 | case 0: | |
256 | auth_result = auth_result_t::success; | |
257 | break; | |
258 | default: | |
259 | auth_result = auth_result_t::failure; | |
9f95a23c | 260 | logger().error( |
20effc67 TL |
261 | "do_auth_single: got error {} on mon {}", |
262 | ret, conn->get_peer_addr()); | |
263 | break; | |
11fdf7f2 | 264 | } |
20effc67 | 265 | return auth_result; |
11fdf7f2 TL |
266 | }); |
267 | } | |
268 | ||
f67539c2 | 269 | seastar::future<Connection::auth_result_t> |
9f95a23c | 270 | Connection::do_auth(Connection::request_t what) { |
20effc67 TL |
271 | return seastar::repeat_until_value( |
272 | [this, life_extender=shared_from_this(), what]() { | |
9f95a23c TL |
273 | return do_auth_single(what); |
274 | }); | |
275 | } | |
276 | ||
f67539c2 | 277 | seastar::future<Connection::auth_result_t> Connection::authenticate_v2() |
9f95a23c TL |
278 | { |
279 | auth_start = seastar::lowres_system_clock::now(); | |
20effc67 | 280 | return conn->send(crimson::make_message<MMonGetMap>()).then([this] { |
f67539c2 TL |
281 | auth_done.emplace(); |
282 | return auth_done->get_future(); | |
11fdf7f2 TL |
283 | }); |
284 | } | |
285 | ||
9f95a23c TL |
286 | auth::AuthClient::auth_request_t |
287 | Connection::get_auth_request(const EntityName& entity_name, | |
288 | uint32_t want_keys) | |
289 | { | |
290 | // choose method | |
291 | auth_method = [&] { | |
292 | std::vector<crimson::auth::method_t> methods; | |
293 | auth_registry.get_supported_methods(conn->get_peer_type(), &methods); | |
294 | if (methods.empty()) { | |
295 | logger().info("get_auth_request no methods is supported"); | |
296 | throw crimson::auth::error("no methods is supported"); | |
297 | } | |
298 | return methods.front(); | |
299 | }(); | |
300 | ||
301 | std::vector<uint32_t> modes; | |
302 | auth_registry.get_supported_modes(conn->get_peer_type(), auth_method, | |
303 | &modes); | |
304 | logger().info("method {} preferred_modes {}", auth_method, modes); | |
305 | if (modes.empty()) { | |
306 | throw crimson::auth::error("no modes is supported"); | |
307 | } | |
308 | auth = create_auth(auth_method, global_id, entity_name, want_keys); | |
309 | ||
310 | using ceph::encode; | |
311 | bufferlist bl; | |
312 | // initial request includes some boilerplate... | |
313 | encode((char)AUTH_MODE_MON, bl); | |
314 | encode(entity_name, bl); | |
315 | encode(global_id, bl); | |
316 | // and (maybe) some method-specific initial payload | |
317 | auth->build_initial_request(&bl); | |
318 | return {auth_method, modes, bl}; | |
319 | } | |
320 | ||
321 | tuple<CryptoKey, Connection::secret_t, bufferlist> | |
322 | Connection::handle_auth_reply_more(const ceph::buffer::list& payload) | |
323 | { | |
324 | CryptoKey session_key; | |
325 | secret_t connection_secret; | |
326 | bufferlist reply; | |
327 | auto p = payload.cbegin(); | |
328 | int r = auth->handle_response(0, p, &session_key, &connection_secret); | |
329 | if (r == -EAGAIN) { | |
330 | auth->prepare_build_request(); | |
331 | auth->build_request(reply); | |
332 | logger().info(" responding with {} bytes", reply.length()); | |
333 | return {session_key, connection_secret, reply}; | |
334 | } else if (r < 0) { | |
335 | logger().error(" handle_response returned {}", r); | |
336 | throw crimson::auth::error("unable to build auth"); | |
337 | } else { | |
338 | logger().info("authenticated!"); | |
339 | std::terminate(); | |
340 | } | |
341 | } | |
342 | ||
343 | tuple<CryptoKey, Connection::secret_t, int> | |
344 | Connection::handle_auth_done(uint64_t new_global_id, | |
345 | const ceph::buffer::list& payload) | |
346 | { | |
347 | global_id = new_global_id; | |
348 | auth->set_global_id(global_id); | |
349 | auto p = payload.begin(); | |
350 | CryptoKey session_key; | |
351 | secret_t connection_secret; | |
352 | int r = auth->handle_response(0, p, &session_key, &connection_secret); | |
353 | conn->set_last_keepalive_ack(auth_start); | |
f67539c2 TL |
354 | if (auth_done) { |
355 | auth_done->set_value(auth_result_t::success); | |
356 | auth_done.reset(); | |
357 | } | |
9f95a23c TL |
358 | return {session_key, connection_secret, r}; |
359 | } | |
360 | ||
361 | int Connection::handle_auth_bad_method(uint32_t old_auth_method, | |
362 | int result, | |
363 | const std::vector<uint32_t>& allowed_methods, | |
364 | const std::vector<uint32_t>& allowed_modes) | |
365 | { | |
366 | logger().info("old_auth_method {} result {} allowed_methods {}", | |
367 | old_auth_method, cpp_strerror(result), allowed_methods); | |
368 | std::vector<uint32_t> auth_supported; | |
369 | auth_registry.get_supported_methods(conn->get_peer_type(), &auth_supported); | |
370 | auto p = std::find(auth_supported.begin(), auth_supported.end(), | |
371 | old_auth_method); | |
372 | assert(p != auth_supported.end()); | |
373 | p = std::find_first_of(std::next(p), auth_supported.end(), | |
374 | allowed_methods.begin(), allowed_methods.end()); | |
375 | if (p == auth_supported.end()) { | |
376 | logger().error("server allowed_methods {} but i only support {}", | |
377 | allowed_methods, auth_supported); | |
f67539c2 TL |
378 | assert(auth_done); |
379 | auth_done->set_exception(std::system_error(make_error_code( | |
9f95a23c TL |
380 | crimson::net::error::negotiation_failure))); |
381 | return -EACCES; | |
382 | } | |
383 | auth_method = *p; | |
384 | logger().info("will try {} next", auth_method); | |
385 | return 0; | |
386 | } | |
387 | ||
f67539c2 | 388 | void Connection::close() |
11fdf7f2 | 389 | { |
20effc67 TL |
390 | logger().info("{}", __func__); |
391 | auth_reply.set_value(Ref<MAuthReply>(nullptr)); | |
392 | auth_reply = {}; | |
f67539c2 TL |
393 | if (auth_done) { |
394 | auth_done->set_value(auth_result_t::canceled); | |
395 | auth_done.reset(); | |
396 | } | |
11fdf7f2 | 397 | if (conn && !std::exchange(closed, true)) { |
f67539c2 | 398 | conn->mark_down(); |
11fdf7f2 TL |
399 | } |
400 | } | |
401 | ||
402 | bool Connection::is_my_peer(const entity_addr_t& addr) const | |
403 | { | |
9f95a23c | 404 | ceph_assert(conn); |
11fdf7f2 TL |
405 | return conn->get_peer_addr() == addr; |
406 | } | |
407 | ||
9f95a23c | 408 | crimson::net::ConnectionRef Connection::get_conn() { |
11fdf7f2 TL |
409 | return conn; |
410 | } | |
411 | ||
1e59de90 TL |
412 | Client::mon_command_t::mon_command_t(MURef<MMonCommand> req) |
413 | : req(std::move(req)) | |
20effc67 TL |
414 | {} |
415 | ||
9f95a23c TL |
416 | Client::Client(crimson::net::Messenger& messenger, |
417 | crimson::common::AuthHandler& auth_handler) | |
11fdf7f2 TL |
418 | // currently, crimson is OSD-only |
419 | : want_keys{CEPH_ENTITY_TYPE_MON | | |
420 | CEPH_ENTITY_TYPE_OSD | | |
421 | CEPH_ENTITY_TYPE_MGR}, | |
422 | timer{[this] { tick(); }}, | |
9f95a23c | 423 | msgr{messenger}, |
20effc67 | 424 | log_client{nullptr}, |
9f95a23c TL |
425 | auth_registry{&cct}, |
426 | auth_handler{auth_handler} | |
11fdf7f2 TL |
427 | {} |
428 | ||
429 | Client::Client(Client&&) = default; | |
430 | Client::~Client() = default; | |
431 | ||
432 | seastar::future<> Client::start() { | |
9f95a23c TL |
433 | entity_name = crimson::common::local_conf()->name; |
434 | auth_registry.refresh_config(); | |
11fdf7f2 | 435 | return load_keyring().then([this] { |
9f95a23c | 436 | return monmap.build_initial(crimson::common::local_conf(), false); |
11fdf7f2 TL |
437 | }).then([this] { |
438 | return authenticate(); | |
9f95a23c TL |
439 | }).then([this] { |
440 | auto interval = | |
441 | std::chrono::duration_cast<seastar::lowres_clock::duration>( | |
442 | std::chrono::duration<double>( | |
443 | local_conf().get_val<double>("mon_client_ping_interval"))); | |
444 | timer.arm_periodic(interval); | |
11fdf7f2 TL |
445 | }); |
446 | } | |
447 | ||
448 | seastar::future<> Client::load_keyring() | |
449 | { | |
9f95a23c | 450 | if (!auth_registry.is_supported_method(msgr.get_mytype(), CEPH_AUTH_CEPHX)) { |
11fdf7f2 TL |
451 | return seastar::now(); |
452 | } else { | |
9f95a23c TL |
453 | return crimson::auth::load_from_keyring(&keyring).then([](KeyRing* keyring) { |
454 | return crimson::auth::load_from_keyfile(keyring); | |
11fdf7f2 | 455 | }).then([](KeyRing* keyring) { |
9f95a23c | 456 | return crimson::auth::load_from_key(keyring); |
11fdf7f2 TL |
457 | }).then([](KeyRing*) { |
458 | return seastar::now(); | |
459 | }); | |
460 | } | |
461 | } | |
462 | ||
463 | void Client::tick() | |
464 | { | |
f67539c2 | 465 | gate.dispatch_in_background(__func__, *this, [this] { |
9f95a23c | 466 | if (active_con) { |
20effc67 | 467 | return seastar::when_all_succeed(wait_for_send_log(), |
1e59de90 | 468 | active_con->get_conn()->send_keepalive(), |
9f95a23c | 469 | active_con->renew_tickets(), |
1e59de90 | 470 | active_con->renew_rotating_keyring()).discard_result(); |
9f95a23c | 471 | } else { |
20effc67 TL |
472 | assert(is_hunting()); |
473 | logger().info("{} continuing the hunt", __func__); | |
474 | return authenticate(); | |
9f95a23c | 475 | } |
11fdf7f2 TL |
476 | }); |
477 | } | |
478 | ||
20effc67 TL |
479 | seastar::future<> Client::wait_for_send_log() { |
480 | utime_t now = ceph_clock_now(); | |
481 | if (now > last_send_log + cct._conf->mon_client_log_interval) { | |
482 | last_send_log = now; | |
483 | return send_log(log_flushing_t::NO_FLUSH); | |
484 | } | |
485 | return seastar::now(); | |
486 | } | |
487 | ||
488 | seastar::future<> Client::send_log(log_flushing_t flush_flag) { | |
489 | if (log_client) { | |
490 | if (auto lm = log_client->get_mon_log_message(flush_flag); lm) { | |
491 | return send_message(std::move(lm)); | |
492 | } | |
493 | more_log_pending = log_client->are_pending(); | |
494 | } | |
495 | return seastar::now(); | |
496 | } | |
497 | ||
11fdf7f2 TL |
498 | bool Client::is_hunting() const { |
499 | return !active_con; | |
500 | } | |
501 | ||
f67539c2 TL |
502 | std::optional<seastar::future<>> |
503 | Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) | |
11fdf7f2 | 504 | { |
f67539c2 TL |
505 | bool dispatched = true; |
506 | gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] { | |
507 | // we only care about these message types | |
508 | switch (m->get_type()) { | |
509 | case CEPH_MSG_MON_MAP: | |
1e59de90 | 510 | return handle_monmap(*conn, boost::static_pointer_cast<MMonMap>(m)); |
f67539c2 TL |
511 | case CEPH_MSG_AUTH_REPLY: |
512 | return handle_auth_reply( | |
1e59de90 | 513 | *conn, boost::static_pointer_cast<MAuthReply>(m)); |
f67539c2 TL |
514 | case CEPH_MSG_MON_SUBSCRIBE_ACK: |
515 | return handle_subscribe_ack( | |
516 | boost::static_pointer_cast<MMonSubscribeAck>(m)); | |
517 | case CEPH_MSG_MON_GET_VERSION_REPLY: | |
518 | return handle_get_version_reply( | |
519 | boost::static_pointer_cast<MMonGetVersionReply>(m)); | |
520 | case MSG_MON_COMMAND_ACK: | |
521 | return handle_mon_command_ack( | |
522 | boost::static_pointer_cast<MMonCommandAck>(m)); | |
523 | case MSG_LOGACK: | |
524 | return handle_log_ack( | |
525 | boost::static_pointer_cast<MLogAck>(m)); | |
526 | case MSG_CONFIG: | |
527 | return handle_config( | |
528 | boost::static_pointer_cast<MConfig>(m)); | |
529 | default: | |
530 | dispatched = false; | |
531 | return seastar::now(); | |
532 | } | |
533 | }); | |
534 | return (dispatched ? std::make_optional(seastar::now()) : std::nullopt); | |
11fdf7f2 TL |
535 | } |
536 | ||
f67539c2 | 537 | void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool /* is_replace */) |
11fdf7f2 | 538 | { |
f67539c2 TL |
539 | gate.dispatch_in_background(__func__, *this, [this, conn] { |
540 | auto found = std::find_if(pending_conns.begin(), pending_conns.end(), | |
541 | [peer_addr = conn->get_peer_addr()](auto& mc) { | |
542 | return mc->is_my_peer(peer_addr); | |
543 | }); | |
544 | if (found != pending_conns.end()) { | |
545 | logger().warn("pending conn reset by {}", conn->get_peer_addr()); | |
546 | (*found)->close(); | |
20effc67 | 547 | pending_conns.erase(found); |
f67539c2 TL |
548 | return seastar::now(); |
549 | } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) { | |
550 | logger().warn("active conn reset {}", conn->get_peer_addr()); | |
20effc67 TL |
551 | return reopen_session(-1).then([this](bool opened) { |
552 | if (opened) { | |
553 | return on_session_opened(); | |
554 | } else { | |
555 | return seastar::now(); | |
556 | } | |
f67539c2 TL |
557 | }); |
558 | } else { | |
559 | return seastar::now(); | |
560 | } | |
561 | }); | |
11fdf7f2 TL |
562 | } |
563 | ||
9f95a23c TL |
564 | std::pair<std::vector<uint32_t>, std::vector<uint32_t>> |
565 | Client::get_supported_auth_methods(int peer_type) | |
566 | { | |
567 | std::vector<uint32_t> methods; | |
568 | std::vector<uint32_t> modes; | |
569 | auth_registry.get_supported_methods(peer_type, &methods, &modes); | |
570 | return {methods, modes}; | |
571 | } | |
572 | ||
573 | uint32_t Client::pick_con_mode(int peer_type, | |
574 | uint32_t auth_method, | |
575 | const std::vector<uint32_t>& preferred_modes) | |
576 | { | |
577 | return auth_registry.pick_mode(peer_type, auth_method, preferred_modes); | |
578 | } | |
579 | ||
580 | AuthAuthorizeHandler* Client::get_auth_authorize_handler(int peer_type, | |
581 | int auth_method) | |
582 | { | |
583 | return auth_registry.get_handler(peer_type, auth_method); | |
584 | } | |
585 | ||
586 | ||
1e59de90 TL |
587 | int Client::handle_auth_request(crimson::net::Connection &conn, |
588 | AuthConnectionMeta &auth_meta, | |
9f95a23c TL |
589 | bool more, |
590 | uint32_t auth_method, | |
591 | const ceph::bufferlist& payload, | |
1e59de90 | 592 | uint64_t *p_peer_global_id, |
9f95a23c TL |
593 | ceph::bufferlist *reply) |
594 | { | |
9f95a23c | 595 | if (payload.length() == 0) { |
1e59de90 | 596 | return -EACCES; |
9f95a23c | 597 | } |
1e59de90 TL |
598 | auth_meta.auth_mode = payload[0]; |
599 | if (auth_meta.auth_mode < AUTH_MODE_AUTHORIZER || | |
600 | auth_meta.auth_mode > AUTH_MODE_AUTHORIZER_MAX) { | |
9f95a23c TL |
601 | return -EACCES; |
602 | } | |
1e59de90 | 603 | AuthAuthorizeHandler* ah = get_auth_authorize_handler(conn.get_peer_type(), |
9f95a23c TL |
604 | auth_method); |
605 | if (!ah) { | |
606 | logger().error("no AuthAuthorizeHandler found for auth method: {}", | |
607 | auth_method); | |
608 | return -EOPNOTSUPP; | |
609 | } | |
1e59de90 TL |
610 | auto authorizer_challenge = &auth_meta.authorizer_challenge; |
611 | if (auth_meta.skip_authorizer_challenge) { | |
612 | logger().info("skipping challenge on {}", conn); | |
9f95a23c TL |
613 | authorizer_challenge = nullptr; |
614 | } | |
20effc67 TL |
615 | if (!active_con) { |
616 | logger().info("auth request during inactivity period"); | |
617 | // let's instruct the client to come back later | |
618 | return -EBUSY; | |
619 | } | |
1e59de90 | 620 | bool was_challenge = (bool)auth_meta.authorizer_challenge; |
9f95a23c TL |
621 | EntityName name; |
622 | AuthCapsInfo caps_info; | |
623 | bool is_valid = ah->verify_authorizer( | |
624 | &cct, | |
625 | active_con->get_keys(), | |
626 | payload, | |
1e59de90 | 627 | auth_meta.get_connection_secret_length(), |
9f95a23c TL |
628 | reply, |
629 | &name, | |
1e59de90 | 630 | p_peer_global_id, |
9f95a23c | 631 | &caps_info, |
1e59de90 TL |
632 | &auth_meta.session_key, |
633 | &auth_meta.connection_secret, | |
9f95a23c TL |
634 | authorizer_challenge); |
635 | if (is_valid) { | |
636 | auth_handler.handle_authentication(name, caps_info); | |
637 | return 1; | |
638 | } | |
1e59de90 TL |
639 | if (!more && !was_challenge && auth_meta.authorizer_challenge) { |
640 | logger().info("added challenge on {}", conn); | |
9f95a23c TL |
641 | return 0; |
642 | } else { | |
1e59de90 | 643 | logger().info("bad authorizer on {}", conn); |
9f95a23c TL |
644 | return -EACCES; |
645 | } | |
646 | } | |
647 | ||
648 | auth::AuthClient::auth_request_t | |
1e59de90 TL |
649 | Client::get_auth_request(crimson::net::Connection &conn, |
650 | AuthConnectionMeta &auth_meta) | |
9f95a23c | 651 | { |
1e59de90 TL |
652 | logger().info("get_auth_request(conn={}, auth_method={})", |
653 | conn, auth_meta.auth_method); | |
9f95a23c | 654 | // connection to mon? |
1e59de90 | 655 | if (conn.get_peer_type() == CEPH_ENTITY_TYPE_MON) { |
9f95a23c | 656 | auto found = std::find_if(pending_conns.begin(), pending_conns.end(), |
1e59de90 | 657 | [peer_addr = conn.get_peer_addr()](auto& mc) { |
9f95a23c TL |
658 | return mc->is_my_peer(peer_addr); |
659 | }); | |
660 | if (found == pending_conns.end()) { | |
661 | throw crimson::auth::error{"unknown connection"}; | |
662 | } | |
663 | return (*found)->get_auth_request(entity_name, want_keys); | |
664 | } else { | |
665 | // generate authorizer | |
666 | if (!active_con) { | |
667 | logger().error(" but no auth handler is set up"); | |
668 | throw crimson::auth::error("no auth available"); | |
669 | } | |
1e59de90 | 670 | auto authorizer = active_con->get_authorizer(conn.get_peer_type()); |
9f95a23c TL |
671 | if (!authorizer) { |
672 | logger().error("failed to build_authorizer for type {}", | |
1e59de90 | 673 | ceph_entity_type_name(conn.get_peer_type())); |
9f95a23c TL |
674 | throw crimson::auth::error("unable to build auth"); |
675 | } | |
1e59de90 TL |
676 | auth_meta.authorizer.reset(authorizer); |
677 | auth_meta.auth_method = authorizer->protocol; | |
9f95a23c | 678 | vector<uint32_t> modes; |
1e59de90 TL |
679 | auth_registry.get_supported_modes(conn.get_peer_type(), |
680 | auth_meta.auth_method, | |
9f95a23c TL |
681 | &modes); |
682 | return {authorizer->protocol, modes, authorizer->bl}; | |
683 | } | |
684 | } | |
685 | ||
1e59de90 TL |
686 | ceph::bufferlist Client::handle_auth_reply_more(crimson::net::Connection &conn, |
687 | AuthConnectionMeta &auth_meta, | |
9f95a23c TL |
688 | const bufferlist& bl) |
689 | { | |
1e59de90 | 690 | if (conn.get_peer_type() == CEPH_ENTITY_TYPE_MON) { |
9f95a23c | 691 | auto found = std::find_if(pending_conns.begin(), pending_conns.end(), |
1e59de90 | 692 | [peer_addr = conn.get_peer_addr()](auto& mc) { |
9f95a23c TL |
693 | return mc->is_my_peer(peer_addr); |
694 | }); | |
695 | if (found == pending_conns.end()) { | |
696 | throw crimson::auth::error{"unknown connection"}; | |
697 | } | |
698 | bufferlist reply; | |
1e59de90 | 699 | tie(auth_meta.session_key, auth_meta.connection_secret, reply) = |
9f95a23c TL |
700 | (*found)->handle_auth_reply_more(bl); |
701 | return reply; | |
702 | } else { | |
703 | // authorizer challenges | |
1e59de90 | 704 | if (!active_con || !auth_meta.authorizer) { |
9f95a23c TL |
705 | logger().error("no authorizer?"); |
706 | throw crimson::auth::error("no auth available"); | |
707 | } | |
1e59de90 TL |
708 | auth_meta.authorizer->add_challenge(&cct, bl); |
709 | return auth_meta.authorizer->bl; | |
9f95a23c TL |
710 | } |
711 | } | |
712 | ||
1e59de90 TL |
713 | int Client::handle_auth_done(crimson::net::Connection &conn, |
714 | AuthConnectionMeta &auth_meta, | |
9f95a23c | 715 | uint64_t global_id, |
20effc67 | 716 | uint32_t /*con_mode*/, |
9f95a23c TL |
717 | const bufferlist& bl) |
718 | { | |
1e59de90 | 719 | if (conn.get_peer_type() == CEPH_ENTITY_TYPE_MON) { |
9f95a23c | 720 | auto found = std::find_if(pending_conns.begin(), pending_conns.end(), |
1e59de90 | 721 | [peer_addr = conn.get_peer_addr()](auto& mc) { |
9f95a23c TL |
722 | return mc->is_my_peer(peer_addr); |
723 | }); | |
724 | if (found == pending_conns.end()) { | |
725 | return -ENOENT; | |
726 | } | |
727 | int r = 0; | |
1e59de90 | 728 | tie(auth_meta.session_key, auth_meta.connection_secret, r) = |
9f95a23c TL |
729 | (*found)->handle_auth_done(global_id, bl); |
730 | return r; | |
731 | } else { | |
732 | // verify authorizer reply | |
733 | auto p = bl.begin(); | |
1e59de90 | 734 | if (!auth_meta.authorizer->verify_reply(p, &auth_meta.connection_secret)) { |
9f95a23c TL |
735 | logger().error("failed verifying authorizer reply"); |
736 | return -EACCES; | |
737 | } | |
1e59de90 | 738 | auth_meta.session_key = auth_meta.authorizer->session_key; |
9f95a23c TL |
739 | return 0; |
740 | } | |
741 | } | |
742 | ||
743 | // Handle server's indication that the previous auth attempt failed | |
1e59de90 TL |
744 | int Client::handle_auth_bad_method(crimson::net::Connection &conn, |
745 | AuthConnectionMeta &auth_meta, | |
9f95a23c TL |
746 | uint32_t old_auth_method, |
747 | int result, | |
748 | const std::vector<uint32_t>& allowed_methods, | |
749 | const std::vector<uint32_t>& allowed_modes) | |
750 | { | |
1e59de90 | 751 | if (conn.get_peer_type() == CEPH_ENTITY_TYPE_MON) { |
9f95a23c | 752 | auto found = std::find_if(pending_conns.begin(), pending_conns.end(), |
1e59de90 | 753 | [peer_addr = conn.get_peer_addr()](auto& mc) { |
9f95a23c TL |
754 | return mc->is_my_peer(peer_addr); |
755 | }); | |
756 | if (found != pending_conns.end()) { | |
757 | return (*found)->handle_auth_bad_method( | |
758 | old_auth_method, result, | |
759 | allowed_methods, allowed_modes); | |
760 | } else { | |
761 | return -ENOENT; | |
762 | } | |
763 | } else { | |
764 | // huh... | |
765 | logger().info("hmm, they didn't like {} result {}", | |
766 | old_auth_method, cpp_strerror(result)); | |
767 | return -EACCES; | |
768 | } | |
769 | } | |
770 | ||
1e59de90 | 771 | seastar::future<> Client::handle_monmap(crimson::net::Connection &conn, |
11fdf7f2 TL |
772 | Ref<MMonMap> m) |
773 | { | |
774 | monmap.decode(m->monmapbl); | |
1e59de90 | 775 | const auto peer_addr = conn.get_peer_addr(); |
11fdf7f2 TL |
776 | auto cur_mon = monmap.get_name(peer_addr); |
777 | logger().info("got monmap {}, mon.{}, is now rank {}", | |
778 | monmap.epoch, cur_mon, monmap.get_rank(cur_mon)); | |
779 | sub.got("monmap", monmap.get_epoch()); | |
780 | ||
781 | if (monmap.get_addr_name(peer_addr, cur_mon)) { | |
9f95a23c TL |
782 | if (active_con) { |
783 | logger().info("handle_monmap: renewing tickets"); | |
784 | return seastar::when_all_succeed( | |
785 | active_con->renew_tickets(), | |
1e59de90 | 786 | active_con->renew_rotating_keyring()).then_unpack([] { |
9f95a23c TL |
787 | logger().info("handle_mon_map: renewed tickets"); |
788 | }); | |
789 | } else { | |
790 | return seastar::now(); | |
791 | } | |
11fdf7f2 TL |
792 | } else { |
793 | logger().warn("mon.{} went away", cur_mon); | |
20effc67 TL |
794 | return reopen_session(-1).then([this](bool opened) { |
795 | if (opened) { | |
796 | return on_session_opened(); | |
797 | } else { | |
798 | return seastar::now(); | |
799 | } | |
f67539c2 | 800 | }); |
11fdf7f2 TL |
801 | } |
802 | } | |
803 | ||
1e59de90 | 804 | seastar::future<> Client::handle_auth_reply(crimson::net::Connection &conn, |
f67539c2 | 805 | Ref<MAuthReply> m) |
11fdf7f2 | 806 | { |
1e59de90 TL |
807 | logger().info("handle_auth_reply {} returns {}: {}", |
808 | conn, *m, m->result); | |
11fdf7f2 | 809 | auto found = std::find_if(pending_conns.begin(), pending_conns.end(), |
1e59de90 | 810 | [peer_addr = conn.get_peer_addr()](auto& mc) { |
9f95a23c | 811 | return mc->is_my_peer(peer_addr); |
11fdf7f2 TL |
812 | }); |
813 | if (found != pending_conns.end()) { | |
9f95a23c | 814 | return (*found)->handle_auth_reply(m); |
11fdf7f2 | 815 | } else if (active_con) { |
1e59de90 TL |
816 | return active_con->handle_auth_reply(m).then([this] { |
817 | return seastar::when_all_succeed( | |
818 | active_con->renew_rotating_keyring(), | |
819 | active_con->renew_tickets()).discard_result(); | |
820 | }); | |
11fdf7f2 | 821 | } else { |
1e59de90 | 822 | logger().error("unknown auth reply from {}", conn.get_peer_addr()); |
11fdf7f2 TL |
823 | return seastar::now(); |
824 | } | |
825 | } | |
826 | ||
827 | seastar::future<> Client::handle_subscribe_ack(Ref<MMonSubscribeAck> m) | |
828 | { | |
829 | sub.acked(m->interval); | |
830 | return seastar::now(); | |
831 | } | |
832 | ||
833 | Client::get_version_t Client::get_version(const std::string& map) | |
834 | { | |
20effc67 | 835 | auto m = crimson::make_message<MMonGetVersion>(); |
11fdf7f2 TL |
836 | auto tid = ++last_version_req_id; |
837 | m->handle = tid; | |
838 | m->what = map; | |
839 | auto& req = version_reqs[tid]; | |
20effc67 | 840 | return send_message(std::move(m)).then([&req] { |
11fdf7f2 TL |
841 | return req.get_future(); |
842 | }); | |
843 | } | |
844 | ||
845 | seastar::future<> | |
846 | Client::handle_get_version_reply(Ref<MMonGetVersionReply> m) | |
847 | { | |
848 | if (auto found = version_reqs.find(m->handle); | |
849 | found != version_reqs.end()) { | |
850 | auto& result = found->second; | |
851 | logger().trace("{}: {} returns {}", | |
852 | __func__, m->handle, m->version); | |
f67539c2 | 853 | result.set_value(std::make_tuple(m->version, m->oldest_version)); |
11fdf7f2 TL |
854 | version_reqs.erase(found); |
855 | } else { | |
856 | logger().warn("{}: version request with handle {} not found", | |
857 | __func__, m->handle); | |
858 | } | |
859 | return seastar::now(); | |
860 | } | |
861 | ||
862 | seastar::future<> Client::handle_mon_command_ack(Ref<MMonCommandAck> m) | |
863 | { | |
864 | const auto tid = m->get_tid(); | |
20effc67 TL |
865 | if (auto found = std::find_if(mon_commands.begin(), |
866 | mon_commands.end(), | |
867 | [tid](auto& cmd) { | |
868 | return cmd.req->get_tid() == tid; | |
869 | }); | |
11fdf7f2 | 870 | found != mon_commands.end()) { |
20effc67 | 871 | auto& command = *found; |
11fdf7f2 | 872 | logger().trace("{} {}", __func__, tid); |
20effc67 | 873 | command.result.set_value(std::make_tuple(m->r, m->rs, std::move(m->get_data()))); |
11fdf7f2 TL |
874 | mon_commands.erase(found); |
875 | } else { | |
876 | logger().warn("{} {} not found", __func__, tid); | |
877 | } | |
878 | return seastar::now(); | |
879 | } | |
880 | ||
881 | seastar::future<> Client::handle_log_ack(Ref<MLogAck> m) | |
882 | { | |
20effc67 TL |
883 | if (log_client) { |
884 | return log_client->handle_log_ack(m).then([this] { | |
885 | if (more_log_pending) { | |
886 | return send_log(log_flushing_t::NO_FLUSH); | |
887 | } else { | |
888 | return seastar::now(); | |
889 | } | |
890 | }); | |
891 | } | |
11fdf7f2 TL |
892 | return seastar::now(); |
893 | } | |
894 | ||
895 | seastar::future<> Client::handle_config(Ref<MConfig> m) | |
896 | { | |
20effc67 TL |
897 | return crimson::common::local_conf().set_mon_vals(m->config).then([this] { |
898 | if (config_updated) { | |
899 | config_updated->set_value(); | |
900 | } | |
901 | }); | |
11fdf7f2 TL |
902 | } |
903 | ||
904 | std::vector<unsigned> Client::get_random_mons(unsigned n) const | |
905 | { | |
906 | uint16_t min_priority = std::numeric_limits<uint16_t>::max(); | |
907 | for (const auto& m : monmap.mon_info) { | |
908 | if (m.second.priority < min_priority) { | |
909 | min_priority = m.second.priority; | |
910 | } | |
911 | } | |
912 | vector<unsigned> ranks; | |
913 | for (auto [name, info] : monmap.mon_info) { | |
11fdf7f2 TL |
914 | if (info.priority == min_priority) { |
915 | ranks.push_back(monmap.get_rank(name)); | |
916 | } | |
917 | } | |
918 | std::random_device rd; | |
919 | std::default_random_engine rng{rd()}; | |
920 | std::shuffle(ranks.begin(), ranks.end(), rng); | |
921 | if (n == 0 || n > ranks.size()) { | |
922 | return ranks; | |
923 | } else { | |
924 | return {ranks.begin(), ranks.begin() + n}; | |
925 | } | |
926 | } | |
927 | ||
928 | seastar::future<> Client::authenticate() | |
929 | { | |
20effc67 TL |
930 | return reopen_session(-1).then([this](bool opened) { |
931 | if (opened) { | |
932 | return on_session_opened(); | |
933 | } else { | |
934 | return seastar::now(); | |
935 | } | |
f67539c2 | 936 | }); |
11fdf7f2 TL |
937 | } |
938 | ||
939 | seastar::future<> Client::stop() | |
940 | { | |
f67539c2 TL |
941 | logger().info("{}", __func__); |
942 | auto fut = gate.close(); | |
943 | timer.cancel(); | |
20effc67 | 944 | ready_to_send = false; |
f67539c2 TL |
945 | for (auto& pending_con : pending_conns) { |
946 | pending_con->close(); | |
947 | } | |
948 | if (active_con) { | |
949 | active_con->close(); | |
950 | } | |
951 | return fut; | |
11fdf7f2 TL |
952 | } |
953 | ||
20effc67 TL |
954 | static entity_addr_t choose_client_addr( |
955 | const entity_addrvec_t& my_addrs, | |
956 | const entity_addrvec_t& client_addrs) | |
957 | { | |
958 | // here is where we decide which of the addrs to connect to. always prefer | |
959 | // the first one, if we support it. | |
960 | for (const auto& a : client_addrs.v) { | |
961 | if (a.is_msgr2()) { | |
962 | // FIXME: for ipv4 vs ipv6, check whether local host can handle ipv6 before | |
963 | // trying it? for now, just pick whichever is listed first. | |
964 | return a; | |
965 | } | |
966 | } | |
967 | return entity_addr_t{}; | |
968 | } | |
969 | ||
970 | seastar::future<bool> Client::reopen_session(int rank) | |
11fdf7f2 | 971 | { |
9f95a23c | 972 | logger().info("{} to mon.{}", __func__, rank); |
20effc67 TL |
973 | ready_to_send = false; |
974 | if (active_con) { | |
975 | active_con->close(); | |
976 | active_con = nullptr; | |
977 | ceph_assert(pending_conns.empty()); | |
978 | } else { | |
979 | for (auto& pending_con : pending_conns) { | |
980 | pending_con->close(); | |
981 | } | |
982 | pending_conns.clear(); | |
983 | } | |
11fdf7f2 TL |
984 | vector<unsigned> mons; |
985 | if (rank >= 0) { | |
986 | mons.push_back(rank); | |
987 | } else { | |
988 | const auto parallel = | |
9f95a23c | 989 | crimson::common::local_conf().get_val<uint64_t>("mon_client_hunt_parallel"); |
11fdf7f2 TL |
990 | mons = get_random_mons(parallel); |
991 | } | |
992 | pending_conns.reserve(mons.size()); | |
993 | return seastar::parallel_for_each(mons, [this](auto rank) { | |
20effc67 TL |
994 | auto peer = choose_client_addr(msgr.get_myaddrs(), |
995 | monmap.get_addrs(rank)); | |
f67539c2 TL |
996 | if (peer == entity_addr_t{}) { |
997 | // crimson msgr only uses the first bound addr | |
998 | logger().warn("mon.{} does not have an addr compatible with me", rank); | |
999 | return seastar::now(); | |
1000 | } | |
11fdf7f2 | 1001 | logger().info("connecting to mon.{}", rank); |
20effc67 TL |
1002 | auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON); |
1003 | auto& mc = pending_conns.emplace_back( | |
1004 | seastar::make_shared<Connection>(auth_registry, conn, &keyring)); | |
1005 | assert(conn->get_peer_addr().is_msgr2()); | |
1006 | return mc->authenticate_v2().then([peer, this](auto result) { | |
f67539c2 TL |
1007 | if (result == Connection::auth_result_t::success) { |
1008 | _finish_auth(peer); | |
9f95a23c | 1009 | } |
9f95a23c TL |
1010 | logger().debug("reopen_session mon connection attempts complete"); |
1011 | }).handle_exception([](auto ep) { | |
1012 | logger().error("mon connections failed with ep {}", ep); | |
1013 | return seastar::make_exception_future(ep); | |
11fdf7f2 TL |
1014 | }); |
1015 | }).then([this] { | |
20effc67 TL |
1016 | if (active_con) { |
1017 | return true; | |
1018 | } else { | |
1019 | logger().warn("cannot establish the active_con with any mon"); | |
1020 | return false; | |
f67539c2 | 1021 | } |
11fdf7f2 TL |
1022 | }); |
1023 | } | |
1024 | ||
f67539c2 TL |
1025 | void Client::_finish_auth(const entity_addr_t& peer) |
1026 | { | |
1027 | if (!is_hunting()) { | |
1028 | return; | |
1029 | } | |
1030 | logger().info("found mon.{}", monmap.get_name(peer)); | |
1031 | ||
1032 | auto found = std::find_if( | |
1033 | pending_conns.begin(), pending_conns.end(), | |
1034 | [peer](auto& conn) { | |
1035 | return conn->is_my_peer(peer); | |
1036 | }); | |
1037 | if (found == pending_conns.end()) { | |
1038 | // Happens if another connection has won the race | |
1039 | ceph_assert(active_con && pending_conns.empty()); | |
1040 | logger().info("no pending connection for mon.{}, peer {}", | |
1041 | monmap.get_name(peer), peer); | |
1042 | return; | |
1043 | } | |
1044 | ||
1045 | ceph_assert(!active_con && !pending_conns.empty()); | |
20effc67 TL |
1046 | // It's too early to toggle the `ready_to_send` flag. It will |
1047 | // be set atfer finishing the MAuth exchange and draining out | |
1048 | // the `pending_messages` queue. | |
f67539c2 | 1049 | active_con = std::move(*found); |
20effc67 | 1050 | *found = nullptr; |
f67539c2 TL |
1051 | for (auto& conn : pending_conns) { |
1052 | if (conn) { | |
1053 | conn->close(); | |
1054 | } | |
1055 | } | |
1056 | pending_conns.clear(); | |
1057 | } | |
1058 | ||
11fdf7f2 | 1059 | Client::command_result_t |
20effc67 TL |
1060 | Client::run_command(std::string&& cmd, |
1061 | bufferlist&& bl) | |
11fdf7f2 | 1062 | { |
20effc67 | 1063 | auto m = crimson::make_message<MMonCommand>(monmap.fsid); |
11fdf7f2 TL |
1064 | auto tid = ++last_mon_command_id; |
1065 | m->set_tid(tid); | |
20effc67 TL |
1066 | m->cmd = {std::move(cmd)}; |
1067 | m->set_data(std::move(bl)); | |
1e59de90 | 1068 | auto& command = mon_commands.emplace_back(crimson::make_message<MMonCommand>(*m)); |
20effc67 TL |
1069 | return send_message(std::move(m)).then([&result=command.result] { |
1070 | return result.get_future(); | |
11fdf7f2 TL |
1071 | }); |
1072 | } | |
1073 | ||
20effc67 | 1074 | seastar::future<> Client::send_message(MessageURef m) |
11fdf7f2 | 1075 | { |
20effc67 TL |
1076 | if (active_con && ready_to_send) { |
1077 | assert(pending_messages.empty()); | |
1078 | return active_con->get_conn()->send(std::move(m)); | |
1079 | } else { | |
1080 | auto& delayed = pending_messages.emplace_back(std::move(m)); | |
1081 | return delayed.pr.get_future(); | |
f67539c2 | 1082 | } |
f67539c2 TL |
1083 | } |
1084 | ||
20effc67 | 1085 | seastar::future<> Client::on_session_opened() |
f67539c2 | 1086 | { |
20effc67 TL |
1087 | return active_con->renew_rotating_keyring().then([this] { |
1088 | if (!active_con) { | |
1089 | // the connection can be closed even in the middle of the opening sequence | |
1090 | logger().info("on_session_opened {}: connection closed", __LINE__); | |
1091 | return seastar::now(); | |
1092 | } | |
f67539c2 | 1093 | for (auto& m : pending_messages) { |
20effc67 | 1094 | (void) active_con->get_conn()->send(std::move(m.msg)); |
f67539c2 TL |
1095 | m.pr.set_value(); |
1096 | } | |
1097 | pending_messages.clear(); | |
20effc67 TL |
1098 | ready_to_send = true; |
1099 | return sub.reload() ? renew_subs() : seastar::now(); | |
1100 | }).then([this] { | |
1101 | if (!active_con) { | |
1102 | logger().info("on_session_opened {}: connection closed", __LINE__); | |
1103 | return seastar::now(); | |
1104 | } | |
1105 | return seastar::parallel_for_each(mon_commands, | |
1106 | [this](auto &command) { | |
1107 | return send_message(crimson::make_message<MMonCommand>(*command.req)); | |
1108 | }); | |
1109 | }); | |
11fdf7f2 TL |
1110 | } |
1111 | ||
1112 | bool Client::sub_want(const std::string& what, version_t start, unsigned flags) | |
1113 | { | |
1114 | return sub.want(what, start, flags); | |
1115 | } | |
1116 | ||
1117 | void Client::sub_got(const std::string& what, version_t have) | |
1118 | { | |
1119 | sub.got(what, have); | |
1120 | } | |
1121 | ||
1122 | void Client::sub_unwant(const std::string& what) | |
1123 | { | |
1124 | sub.unwant(what); | |
1125 | } | |
1126 | ||
1127 | bool Client::sub_want_increment(const std::string& what, | |
1128 | version_t start, | |
1129 | unsigned flags) | |
1130 | { | |
1131 | return sub.inc_want(what, start, flags); | |
1132 | } | |
1133 | ||
1134 | seastar::future<> Client::renew_subs() | |
1135 | { | |
1136 | if (!sub.have_new()) { | |
1137 | logger().warn("{} - empty", __func__); | |
1138 | return seastar::now(); | |
1139 | } | |
1140 | logger().trace("{}", __func__); | |
1141 | ||
20effc67 | 1142 | auto m = crimson::make_message<MMonSubscribe>(); |
11fdf7f2 TL |
1143 | m->what = sub.get_subs(); |
1144 | m->hostname = ceph_get_short_hostname(); | |
20effc67 | 1145 | return send_message(std::move(m)).then([this] { |
11fdf7f2 TL |
1146 | sub.renewed(); |
1147 | }); | |
1148 | } | |
1149 | ||
20effc67 TL |
1150 | seastar::future<> Client::wait_for_config() |
1151 | { | |
1152 | assert(!config_updated); | |
1153 | config_updated = seastar::promise<>(); | |
1154 | return config_updated->get_future(); | |
1155 | } | |
1156 | ||
f67539c2 TL |
1157 | void Client::print(std::ostream& out) const |
1158 | { | |
1159 | out << "mon." << entity_name; | |
1160 | } | |
1161 | ||
9f95a23c | 1162 | } // namespace crimson::mon |