]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
11fdf7f2 TL |
4 | #include "MonClient.h" |
5 | ||
6 | #include <random> | |
7 | ||
8 | #include <seastar/core/future-util.hh> | |
9 | #include <seastar/core/lowres_clock.hh> | |
9f95a23c | 10 | #include <seastar/core/shared_future.hh> |
11fdf7f2 TL |
11 | #include <seastar/util/log.hh> |
12 | ||
13 | #include "auth/AuthClientHandler.h" | |
11fdf7f2 TL |
14 | #include "auth/RotatingKeyRing.h" |
15 | ||
16 | #include "common/hostname.h" | |
17 | ||
18 | #include "crimson/auth/KeyRing.h" | |
19 | #include "crimson/common/config_proxy.h" | |
20 | #include "crimson/common/log.h" | |
21 | #include "crimson/net/Connection.h" | |
22 | #include "crimson/net/Errors.h" | |
23 | #include "crimson/net/Messenger.h" | |
24 | ||
25 | #include "messages/MAuth.h" | |
26 | #include "messages/MAuthReply.h" | |
27 | #include "messages/MConfig.h" | |
28 | #include "messages/MLogAck.h" | |
29 | #include "messages/MMonCommand.h" | |
30 | #include "messages/MMonCommandAck.h" | |
9f95a23c | 31 | #include "messages/MMonGetMap.h" |
11fdf7f2 TL |
32 | #include "messages/MMonGetVersion.h" |
33 | #include "messages/MMonGetVersionReply.h" | |
34 | #include "messages/MMonMap.h" | |
35 | #include "messages/MMonSubscribe.h" | |
36 | #include "messages/MMonSubscribeAck.h" | |
37 | ||
38 | namespace { | |
39 | seastar::logger& logger() | |
40 | { | |
9f95a23c | 41 | return crimson::get_logger(ceph_subsys_monc); |
11fdf7f2 TL |
42 | } |
43 | } | |
44 | ||
9f95a23c | 45 | namespace crimson::mon { |
11fdf7f2 | 46 | |
9f95a23c | 47 | using crimson::common::local_conf; |
11fdf7f2 TL |
48 | |
49 | class Connection { | |
50 | public: | |
9f95a23c TL |
51 | Connection(const AuthRegistry& auth_registry, |
52 | crimson::net::ConnectionRef conn, | |
53 | KeyRing* keyring); | |
f67539c2 | 54 | enum class auth_result_t { |
9f95a23c TL |
55 | success = 0, |
56 | failure, | |
57 | canceled | |
58 | }; | |
11fdf7f2 | 59 | seastar::future<> handle_auth_reply(Ref<MAuthReply> m); |
9f95a23c | 60 | // v1 |
f67539c2 | 61 | seastar::future<auth_result_t> authenticate_v1( |
9f95a23c TL |
62 | epoch_t epoch, |
63 | const EntityName& name, | |
64 | uint32_t want_keys); | |
65 | // v2 | |
f67539c2 | 66 | seastar::future<auth_result_t> authenticate_v2(); |
9f95a23c TL |
67 | auth::AuthClient::auth_request_t |
68 | get_auth_request(const EntityName& name, | |
69 | uint32_t want_keys); | |
70 | using secret_t = string; | |
71 | tuple<CryptoKey, secret_t, bufferlist> | |
72 | handle_auth_reply_more(const ceph::buffer::list& bl); | |
9f95a23c TL |
73 | int handle_auth_bad_method(uint32_t old_auth_method, |
74 | int result, | |
75 | const std::vector<uint32_t>& allowed_methods, | |
76 | const std::vector<uint32_t>& allowed_modes); | |
77 | ||
78 | // v1 and v2 | |
f67539c2 TL |
79 | tuple<CryptoKey, secret_t, int> |
80 | handle_auth_done(uint64_t new_global_id, | |
81 | const ceph::buffer::list& bl); | |
82 | void close(); | |
11fdf7f2 | 83 | bool is_my_peer(const entity_addr_t& addr) const; |
9f95a23c TL |
84 | AuthAuthorizer* get_authorizer(entity_type_t peer) const; |
85 | KeyStore& get_keys(); | |
11fdf7f2 | 86 | seastar::future<> renew_tickets(); |
9f95a23c TL |
87 | seastar::future<> renew_rotating_keyring(); |
88 | ||
89 | crimson::net::ConnectionRef get_conn(); | |
11fdf7f2 TL |
90 | |
91 | private: | |
92 | seastar::future<> setup_session(epoch_t epoch, | |
9f95a23c TL |
93 | const EntityName& name); |
94 | std::unique_ptr<AuthClientHandler> create_auth(crimson::auth::method_t, | |
95 | uint64_t global_id, | |
96 | const EntityName& name, | |
97 | uint32_t want_keys); | |
98 | enum class request_t { | |
99 | rotating, | |
100 | general, | |
101 | }; | |
f67539c2 TL |
102 | seastar::future<std::optional<auth_result_t>> do_auth_single(request_t); |
103 | seastar::future<auth_result_t> do_auth(request_t); | |
11fdf7f2 TL |
104 | |
105 | private: | |
106 | bool closed = false; | |
9f95a23c TL |
107 | // v1 |
108 | seastar::shared_promise<Ref<MAuthReply>> reply; | |
109 | // v2 | |
110 | using clock_t = seastar::lowres_system_clock; | |
111 | clock_t::time_point auth_start; | |
112 | crimson::auth::method_t auth_method = 0; | |
f67539c2 | 113 | std::optional<seastar::promise<auth_result_t>> auth_done; |
9f95a23c TL |
114 | // v1 and v2 |
115 | const AuthRegistry& auth_registry; | |
116 | crimson::net::ConnectionRef conn; | |
11fdf7f2 | 117 | std::unique_ptr<AuthClientHandler> auth; |
9f95a23c TL |
118 | std::unique_ptr<RotatingKeyRing> rotating_keyring; |
119 | uint64_t global_id = 0; | |
120 | clock_t::time_point last_rotating_renew_sent; | |
11fdf7f2 TL |
121 | }; |
122 | ||
9f95a23c TL |
123 | Connection::Connection(const AuthRegistry& auth_registry, |
124 | crimson::net::ConnectionRef conn, | |
11fdf7f2 | 125 | KeyRing* keyring) |
9f95a23c TL |
126 | : auth_registry{auth_registry}, |
127 | conn{conn}, | |
128 | rotating_keyring{ | |
129 | std::make_unique<RotatingKeyRing>(nullptr, | |
130 | CEPH_ENTITY_TYPE_OSD, | |
131 | keyring)} | |
11fdf7f2 TL |
132 | {} |
133 | ||
134 | seastar::future<> Connection::handle_auth_reply(Ref<MAuthReply> m) | |
135 | { | |
136 | reply.set_value(m); | |
9f95a23c | 137 | reply = {}; |
11fdf7f2 TL |
138 | return seastar::now(); |
139 | } | |
140 | ||
141 | seastar::future<> Connection::renew_tickets() | |
142 | { | |
143 | if (auth->need_tickets()) { | |
f67539c2 TL |
144 | return do_auth(request_t::general).then([](auth_result_t r) { |
145 | if (r != auth_result_t::success) { | |
9f95a23c TL |
146 | throw std::system_error( |
147 | make_error_code( | |
148 | crimson::net::error::negotiation_failure)); | |
11fdf7f2 TL |
149 | } |
150 | }); | |
151 | } | |
152 | return seastar::now(); | |
153 | } | |
154 | ||
9f95a23c TL |
155 | seastar::future<> Connection::renew_rotating_keyring() |
156 | { | |
157 | auto now = clock_t::now(); | |
158 | auto ttl = std::chrono::seconds{ | |
159 | static_cast<long>(crimson::common::local_conf()->auth_service_ticket_ttl)}; | |
160 | auto cutoff = now - ttl / 4; | |
161 | if (!rotating_keyring->need_new_secrets(utime_t(cutoff))) { | |
162 | return seastar::now(); | |
163 | } | |
164 | if (now - last_rotating_renew_sent < std::chrono::seconds{1}) { | |
165 | logger().info("renew_rotating_keyring called too often"); | |
166 | return seastar::now(); | |
167 | } | |
168 | last_rotating_renew_sent = now; | |
f67539c2 TL |
169 | return do_auth(request_t::rotating).then([](auth_result_t r) { |
170 | if (r != auth_result_t::success) { | |
9f95a23c TL |
171 | throw std::system_error(make_error_code( |
172 | crimson::net::error::negotiation_failure)); | |
173 | } | |
174 | }); | |
175 | } | |
176 | ||
177 | AuthAuthorizer* Connection::get_authorizer(entity_type_t peer) const | |
178 | { | |
179 | if (auth) { | |
180 | return auth->build_authorizer(peer); | |
181 | } else { | |
182 | return nullptr; | |
183 | } | |
184 | } | |
185 | ||
186 | KeyStore& Connection::get_keys() { | |
187 | return *rotating_keyring; | |
188 | } | |
189 | ||
11fdf7f2 | 190 | std::unique_ptr<AuthClientHandler> |
9f95a23c TL |
191 | Connection::create_auth(crimson::auth::method_t protocol, |
192 | uint64_t global_id, | |
11fdf7f2 TL |
193 | const EntityName& name, |
194 | uint32_t want_keys) | |
195 | { | |
9f95a23c | 196 | static crimson::common::CephContext cct; |
11fdf7f2 TL |
197 | std::unique_ptr<AuthClientHandler> auth; |
198 | auth.reset(AuthClientHandler::create(&cct, | |
9f95a23c TL |
199 | protocol, |
200 | rotating_keyring.get())); | |
11fdf7f2 | 201 | if (!auth) { |
9f95a23c | 202 | logger().error("no handler for protocol {}", protocol); |
11fdf7f2 | 203 | throw std::system_error(make_error_code( |
9f95a23c | 204 | crimson::net::error::negotiation_failure)); |
11fdf7f2 TL |
205 | } |
206 | auth->init(name); | |
207 | auth->set_want_keys(want_keys); | |
208 | auth->set_global_id(global_id); | |
11fdf7f2 TL |
209 | return auth; |
210 | } | |
211 | ||
212 | seastar::future<> | |
213 | Connection::setup_session(epoch_t epoch, | |
11fdf7f2 TL |
214 | const EntityName& name) |
215 | { | |
9f95a23c TL |
216 | auto m = ceph::make_message<MAuth>(); |
217 | m->protocol = CEPH_AUTH_UNKNOWN; | |
11fdf7f2 TL |
218 | m->monmap_epoch = epoch; |
219 | __u8 struct_v = 1; | |
220 | encode(struct_v, m->auth_payload); | |
9f95a23c TL |
221 | std::vector<crimson::auth::method_t> auth_methods; |
222 | auth_registry.get_supported_methods(conn->get_peer_type(), &auth_methods); | |
223 | encode(auth_methods, m->auth_payload); | |
11fdf7f2 TL |
224 | encode(name, m->auth_payload); |
225 | encode(global_id, m->auth_payload); | |
226 | return conn->send(m); | |
227 | } | |
228 | ||
f67539c2 | 229 | seastar::future<std::optional<Connection::auth_result_t>> |
9f95a23c | 230 | Connection::do_auth_single(Connection::request_t what) |
11fdf7f2 TL |
231 | { |
232 | auto m = make_message<MAuth>(); | |
233 | m->protocol = auth->get_protocol(); | |
234 | auth->prepare_build_request(); | |
9f95a23c TL |
235 | switch (what) { |
236 | case request_t::rotating: | |
237 | auth->build_rotating_request(m->auth_payload); | |
238 | break; | |
239 | case request_t::general: | |
240 | if (int ret = auth->build_request(m->auth_payload); ret) { | |
241 | logger().error("missing/bad key for '{}'", local_conf()->name); | |
242 | throw std::system_error(make_error_code( | |
243 | crimson::net::error::negotiation_failure)); | |
244 | } | |
245 | break; | |
246 | default: | |
247 | assert(0); | |
11fdf7f2 TL |
248 | } |
249 | logger().info("sending {}", *m); | |
250 | return conn->send(m).then([this] { | |
251 | logger().info("waiting"); | |
9f95a23c | 252 | return reply.get_shared_future(); |
11fdf7f2 | 253 | }).then([this] (Ref<MAuthReply> m) { |
9f95a23c TL |
254 | if (!m) { |
255 | ceph_assert(closed); | |
256 | logger().info("do_auth: connection closed"); | |
f67539c2 TL |
257 | return seastar::make_ready_future<std::optional<Connection::auth_result_t>>( |
258 | std::make_optional(auth_result_t::canceled)); | |
9f95a23c TL |
259 | } |
260 | logger().info( | |
261 | "do_auth: mon {} => {} returns {}: {}", | |
262 | conn->get_messenger()->get_myaddr(), | |
263 | conn->get_peer_addr(), *m, m->result); | |
11fdf7f2 TL |
264 | auto p = m->result_bl.cbegin(); |
265 | auto ret = auth->handle_response(m->result, p, | |
11fdf7f2 TL |
266 | nullptr, nullptr); |
267 | if (ret != 0 && ret != -EAGAIN) { | |
9f95a23c TL |
268 | logger().error( |
269 | "do_auth: got error {} on mon {}", | |
270 | ret, | |
271 | conn->get_peer_addr()); | |
11fdf7f2 | 272 | } |
f67539c2 | 273 | return seastar::make_ready_future<std::optional<Connection::auth_result_t>>( |
9f95a23c TL |
274 | ret == -EAGAIN |
275 | ? std::nullopt | |
276 | : std::make_optional(ret == 0 | |
f67539c2 TL |
277 | ? auth_result_t::success |
278 | : auth_result_t::failure | |
9f95a23c | 279 | )); |
11fdf7f2 TL |
280 | }); |
281 | } | |
282 | ||
f67539c2 | 283 | seastar::future<Connection::auth_result_t> |
9f95a23c TL |
284 | Connection::do_auth(Connection::request_t what) { |
285 | return seastar::repeat_until_value([this, what]() { | |
286 | return do_auth_single(what); | |
287 | }); | |
288 | } | |
289 | ||
f67539c2 | 290 | seastar::future<Connection::auth_result_t> |
9f95a23c TL |
291 | Connection::authenticate_v1(epoch_t epoch, |
292 | const EntityName& name, | |
293 | uint32_t want_keys) | |
11fdf7f2 | 294 | { |
9f95a23c TL |
295 | return conn->keepalive().then([epoch, name, this] { |
296 | return setup_session(epoch, name); | |
11fdf7f2 | 297 | }).then([this] { |
9f95a23c | 298 | return reply.get_shared_future(); |
11fdf7f2 | 299 | }).then([name, want_keys, this](Ref<MAuthReply> m) { |
9f95a23c TL |
300 | if (!m) { |
301 | logger().error("authenticate_v1 canceled on {}", name); | |
f67539c2 | 302 | return seastar::make_ready_future<auth_result_t>(auth_result_t::canceled); |
9f95a23c | 303 | } |
11fdf7f2 | 304 | global_id = m->global_id; |
9f95a23c | 305 | auth = create_auth(m->protocol, m->global_id, name, want_keys); |
11fdf7f2 TL |
306 | switch (auto p = m->result_bl.cbegin(); |
307 | auth->handle_response(m->result, p, | |
11fdf7f2 TL |
308 | nullptr, nullptr)) { |
309 | case 0: | |
310 | // none | |
f67539c2 | 311 | return seastar::make_ready_future<auth_result_t>(auth_result_t::success); |
11fdf7f2 TL |
312 | case -EAGAIN: |
313 | // cephx | |
9f95a23c | 314 | return do_auth(request_t::general); |
11fdf7f2 TL |
315 | default: |
316 | ceph_assert_always(0); | |
317 | } | |
9f95a23c TL |
318 | }).handle_exception([](auto ep) { |
319 | logger().error("authenticate_v1 failed with {}", ep); | |
f67539c2 | 320 | return seastar::make_ready_future<auth_result_t>(auth_result_t::canceled); |
9f95a23c TL |
321 | }); |
322 | } | |
323 | ||
f67539c2 | 324 | seastar::future<Connection::auth_result_t> Connection::authenticate_v2() |
9f95a23c TL |
325 | { |
326 | auth_start = seastar::lowres_system_clock::now(); | |
327 | return conn->send(make_message<MMonGetMap>()).then([this] { | |
f67539c2 TL |
328 | auth_done.emplace(); |
329 | return auth_done->get_future(); | |
11fdf7f2 TL |
330 | }); |
331 | } | |
332 | ||
9f95a23c TL |
333 | auth::AuthClient::auth_request_t |
334 | Connection::get_auth_request(const EntityName& entity_name, | |
335 | uint32_t want_keys) | |
336 | { | |
337 | // choose method | |
338 | auth_method = [&] { | |
339 | std::vector<crimson::auth::method_t> methods; | |
340 | auth_registry.get_supported_methods(conn->get_peer_type(), &methods); | |
341 | if (methods.empty()) { | |
342 | logger().info("get_auth_request no methods is supported"); | |
343 | throw crimson::auth::error("no methods is supported"); | |
344 | } | |
345 | return methods.front(); | |
346 | }(); | |
347 | ||
348 | std::vector<uint32_t> modes; | |
349 | auth_registry.get_supported_modes(conn->get_peer_type(), auth_method, | |
350 | &modes); | |
351 | logger().info("method {} preferred_modes {}", auth_method, modes); | |
352 | if (modes.empty()) { | |
353 | throw crimson::auth::error("no modes is supported"); | |
354 | } | |
355 | auth = create_auth(auth_method, global_id, entity_name, want_keys); | |
356 | ||
357 | using ceph::encode; | |
358 | bufferlist bl; | |
359 | // initial request includes some boilerplate... | |
360 | encode((char)AUTH_MODE_MON, bl); | |
361 | encode(entity_name, bl); | |
362 | encode(global_id, bl); | |
363 | // and (maybe) some method-specific initial payload | |
364 | auth->build_initial_request(&bl); | |
365 | return {auth_method, modes, bl}; | |
366 | } | |
367 | ||
368 | tuple<CryptoKey, Connection::secret_t, bufferlist> | |
369 | Connection::handle_auth_reply_more(const ceph::buffer::list& payload) | |
370 | { | |
371 | CryptoKey session_key; | |
372 | secret_t connection_secret; | |
373 | bufferlist reply; | |
374 | auto p = payload.cbegin(); | |
375 | int r = auth->handle_response(0, p, &session_key, &connection_secret); | |
376 | if (r == -EAGAIN) { | |
377 | auth->prepare_build_request(); | |
378 | auth->build_request(reply); | |
379 | logger().info(" responding with {} bytes", reply.length()); | |
380 | return {session_key, connection_secret, reply}; | |
381 | } else if (r < 0) { | |
382 | logger().error(" handle_response returned {}", r); | |
383 | throw crimson::auth::error("unable to build auth"); | |
384 | } else { | |
385 | logger().info("authenticated!"); | |
386 | std::terminate(); | |
387 | } | |
388 | } | |
389 | ||
390 | tuple<CryptoKey, Connection::secret_t, int> | |
391 | Connection::handle_auth_done(uint64_t new_global_id, | |
392 | const ceph::buffer::list& payload) | |
393 | { | |
394 | global_id = new_global_id; | |
395 | auth->set_global_id(global_id); | |
396 | auto p = payload.begin(); | |
397 | CryptoKey session_key; | |
398 | secret_t connection_secret; | |
399 | int r = auth->handle_response(0, p, &session_key, &connection_secret); | |
400 | conn->set_last_keepalive_ack(auth_start); | |
f67539c2 TL |
401 | if (auth_done) { |
402 | auth_done->set_value(auth_result_t::success); | |
403 | auth_done.reset(); | |
404 | } | |
9f95a23c TL |
405 | return {session_key, connection_secret, r}; |
406 | } | |
407 | ||
408 | int Connection::handle_auth_bad_method(uint32_t old_auth_method, | |
409 | int result, | |
410 | const std::vector<uint32_t>& allowed_methods, | |
411 | const std::vector<uint32_t>& allowed_modes) | |
412 | { | |
413 | logger().info("old_auth_method {} result {} allowed_methods {}", | |
414 | old_auth_method, cpp_strerror(result), allowed_methods); | |
415 | std::vector<uint32_t> auth_supported; | |
416 | auth_registry.get_supported_methods(conn->get_peer_type(), &auth_supported); | |
417 | auto p = std::find(auth_supported.begin(), auth_supported.end(), | |
418 | old_auth_method); | |
419 | assert(p != auth_supported.end()); | |
420 | p = std::find_first_of(std::next(p), auth_supported.end(), | |
421 | allowed_methods.begin(), allowed_methods.end()); | |
422 | if (p == auth_supported.end()) { | |
423 | logger().error("server allowed_methods {} but i only support {}", | |
424 | allowed_methods, auth_supported); | |
f67539c2 TL |
425 | assert(auth_done); |
426 | auth_done->set_exception(std::system_error(make_error_code( | |
9f95a23c TL |
427 | crimson::net::error::negotiation_failure))); |
428 | return -EACCES; | |
429 | } | |
430 | auth_method = *p; | |
431 | logger().info("will try {} next", auth_method); | |
432 | return 0; | |
433 | } | |
434 | ||
f67539c2 | 435 | void Connection::close() |
11fdf7f2 | 436 | { |
9f95a23c TL |
437 | reply.set_value(Ref<MAuthReply>(nullptr)); |
438 | reply = {}; | |
f67539c2 TL |
439 | if (auth_done) { |
440 | auth_done->set_value(auth_result_t::canceled); | |
441 | auth_done.reset(); | |
442 | } | |
11fdf7f2 | 443 | if (conn && !std::exchange(closed, true)) { |
f67539c2 | 444 | conn->mark_down(); |
11fdf7f2 TL |
445 | } |
446 | } | |
447 | ||
448 | bool Connection::is_my_peer(const entity_addr_t& addr) const | |
449 | { | |
9f95a23c | 450 | ceph_assert(conn); |
11fdf7f2 TL |
451 | return conn->get_peer_addr() == addr; |
452 | } | |
453 | ||
9f95a23c | 454 | crimson::net::ConnectionRef Connection::get_conn() { |
11fdf7f2 TL |
455 | return conn; |
456 | } | |
457 | ||
9f95a23c TL |
458 | Client::Client(crimson::net::Messenger& messenger, |
459 | crimson::common::AuthHandler& auth_handler) | |
11fdf7f2 TL |
460 | // currently, crimson is OSD-only |
461 | : want_keys{CEPH_ENTITY_TYPE_MON | | |
462 | CEPH_ENTITY_TYPE_OSD | | |
463 | CEPH_ENTITY_TYPE_MGR}, | |
464 | timer{[this] { tick(); }}, | |
9f95a23c TL |
465 | msgr{messenger}, |
466 | auth_registry{&cct}, | |
467 | auth_handler{auth_handler} | |
11fdf7f2 TL |
468 | {} |
469 | ||
470 | Client::Client(Client&&) = default; | |
471 | Client::~Client() = default; | |
472 | ||
473 | seastar::future<> Client::start() { | |
9f95a23c TL |
474 | entity_name = crimson::common::local_conf()->name; |
475 | auth_registry.refresh_config(); | |
11fdf7f2 | 476 | return load_keyring().then([this] { |
9f95a23c | 477 | return monmap.build_initial(crimson::common::local_conf(), false); |
11fdf7f2 TL |
478 | }).then([this] { |
479 | return authenticate(); | |
9f95a23c TL |
480 | }).then([this] { |
481 | auto interval = | |
482 | std::chrono::duration_cast<seastar::lowres_clock::duration>( | |
483 | std::chrono::duration<double>( | |
484 | local_conf().get_val<double>("mon_client_ping_interval"))); | |
485 | timer.arm_periodic(interval); | |
11fdf7f2 TL |
486 | }); |
487 | } | |
488 | ||
489 | seastar::future<> Client::load_keyring() | |
490 | { | |
9f95a23c | 491 | if (!auth_registry.is_supported_method(msgr.get_mytype(), CEPH_AUTH_CEPHX)) { |
11fdf7f2 TL |
492 | return seastar::now(); |
493 | } else { | |
9f95a23c TL |
494 | return crimson::auth::load_from_keyring(&keyring).then([](KeyRing* keyring) { |
495 | return crimson::auth::load_from_keyfile(keyring); | |
11fdf7f2 | 496 | }).then([](KeyRing* keyring) { |
9f95a23c | 497 | return crimson::auth::load_from_key(keyring); |
11fdf7f2 TL |
498 | }).then([](KeyRing*) { |
499 | return seastar::now(); | |
500 | }); | |
501 | } | |
502 | } | |
503 | ||
504 | void Client::tick() | |
505 | { | |
f67539c2 | 506 | gate.dispatch_in_background(__func__, *this, [this] { |
9f95a23c TL |
507 | if (active_con) { |
508 | return seastar::when_all_succeed(active_con->get_conn()->keepalive(), | |
509 | active_con->renew_tickets(), | |
f67539c2 | 510 | active_con->renew_rotating_keyring()).then_unpack([] {}); |
9f95a23c TL |
511 | } else { |
512 | return seastar::now(); | |
513 | } | |
11fdf7f2 TL |
514 | }); |
515 | } | |
516 | ||
517 | bool Client::is_hunting() const { | |
518 | return !active_con; | |
519 | } | |
520 | ||
f67539c2 TL |
521 | std::optional<seastar::future<>> |
522 | Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) | |
11fdf7f2 | 523 | { |
f67539c2 TL |
524 | bool dispatched = true; |
525 | gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] { | |
526 | // we only care about these message types | |
527 | switch (m->get_type()) { | |
528 | case CEPH_MSG_MON_MAP: | |
529 | return handle_monmap(conn, boost::static_pointer_cast<MMonMap>(m)); | |
530 | case CEPH_MSG_AUTH_REPLY: | |
531 | return handle_auth_reply( | |
532 | conn, boost::static_pointer_cast<MAuthReply>(m)); | |
533 | case CEPH_MSG_MON_SUBSCRIBE_ACK: | |
534 | return handle_subscribe_ack( | |
535 | boost::static_pointer_cast<MMonSubscribeAck>(m)); | |
536 | case CEPH_MSG_MON_GET_VERSION_REPLY: | |
537 | return handle_get_version_reply( | |
538 | boost::static_pointer_cast<MMonGetVersionReply>(m)); | |
539 | case MSG_MON_COMMAND_ACK: | |
540 | return handle_mon_command_ack( | |
541 | boost::static_pointer_cast<MMonCommandAck>(m)); | |
542 | case MSG_LOGACK: | |
543 | return handle_log_ack( | |
544 | boost::static_pointer_cast<MLogAck>(m)); | |
545 | case MSG_CONFIG: | |
546 | return handle_config( | |
547 | boost::static_pointer_cast<MConfig>(m)); | |
548 | default: | |
549 | dispatched = false; | |
550 | return seastar::now(); | |
551 | } | |
552 | }); | |
553 | return (dispatched ? std::make_optional(seastar::now()) : std::nullopt); | |
11fdf7f2 TL |
554 | } |
555 | ||
f67539c2 | 556 | void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool /* is_replace */) |
11fdf7f2 | 557 | { |
f67539c2 TL |
558 | gate.dispatch_in_background(__func__, *this, [this, conn] { |
559 | auto found = std::find_if(pending_conns.begin(), pending_conns.end(), | |
560 | [peer_addr = conn->get_peer_addr()](auto& mc) { | |
561 | return mc->is_my_peer(peer_addr); | |
562 | }); | |
563 | if (found != pending_conns.end()) { | |
564 | logger().warn("pending conn reset by {}", conn->get_peer_addr()); | |
565 | (*found)->close(); | |
566 | return seastar::now(); | |
567 | } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) { | |
568 | logger().warn("active conn reset {}", conn->get_peer_addr()); | |
569 | active_con.reset(); | |
570 | return reopen_session(-1).then([this] { | |
571 | send_pendings(); | |
572 | return seastar::now(); | |
573 | }); | |
574 | } else { | |
575 | return seastar::now(); | |
576 | } | |
577 | }); | |
11fdf7f2 TL |
578 | } |
579 | ||
9f95a23c TL |
580 | std::pair<std::vector<uint32_t>, std::vector<uint32_t>> |
581 | Client::get_supported_auth_methods(int peer_type) | |
582 | { | |
583 | std::vector<uint32_t> methods; | |
584 | std::vector<uint32_t> modes; | |
585 | auth_registry.get_supported_methods(peer_type, &methods, &modes); | |
586 | return {methods, modes}; | |
587 | } | |
588 | ||
589 | uint32_t Client::pick_con_mode(int peer_type, | |
590 | uint32_t auth_method, | |
591 | const std::vector<uint32_t>& preferred_modes) | |
592 | { | |
593 | return auth_registry.pick_mode(peer_type, auth_method, preferred_modes); | |
594 | } | |
595 | ||
596 | AuthAuthorizeHandler* Client::get_auth_authorize_handler(int peer_type, | |
597 | int auth_method) | |
598 | { | |
599 | return auth_registry.get_handler(peer_type, auth_method); | |
600 | } | |
601 | ||
602 | ||
603 | int Client::handle_auth_request(crimson::net::ConnectionRef con, | |
604 | AuthConnectionMetaRef auth_meta, | |
605 | bool more, | |
606 | uint32_t auth_method, | |
607 | const ceph::bufferlist& payload, | |
608 | ceph::bufferlist *reply) | |
609 | { | |
610 | // for some channels prior to nautilus (osd heartbeat), we tolerate the lack of | |
611 | // an authorizer. | |
612 | if (payload.length() == 0) { | |
613 | if (con->get_messenger()->get_require_authorizer()) { | |
614 | return -EACCES; | |
615 | } else { | |
616 | auth_handler.handle_authentication({}, {}); | |
617 | return 1; | |
618 | } | |
619 | } | |
620 | auth_meta->auth_mode = payload[0]; | |
621 | if (auth_meta->auth_mode < AUTH_MODE_AUTHORIZER || | |
622 | auth_meta->auth_mode > AUTH_MODE_AUTHORIZER_MAX) { | |
623 | return -EACCES; | |
624 | } | |
625 | AuthAuthorizeHandler* ah = get_auth_authorize_handler(con->get_peer_type(), | |
626 | auth_method); | |
627 | if (!ah) { | |
628 | logger().error("no AuthAuthorizeHandler found for auth method: {}", | |
629 | auth_method); | |
630 | return -EOPNOTSUPP; | |
631 | } | |
632 | auto authorizer_challenge = &auth_meta->authorizer_challenge; | |
f67539c2 TL |
633 | if (auth_meta->skip_authorizer_challenge) { |
634 | logger().info("skipping challenge on {}", con); | |
9f95a23c TL |
635 | authorizer_challenge = nullptr; |
636 | } | |
637 | bool was_challenge = (bool)auth_meta->authorizer_challenge; | |
638 | EntityName name; | |
639 | AuthCapsInfo caps_info; | |
640 | bool is_valid = ah->verify_authorizer( | |
641 | &cct, | |
642 | active_con->get_keys(), | |
643 | payload, | |
644 | auth_meta->get_connection_secret_length(), | |
645 | reply, | |
646 | &name, | |
647 | &active_con->get_conn()->peer_global_id, | |
648 | &caps_info, | |
649 | &auth_meta->session_key, | |
650 | &auth_meta->connection_secret, | |
651 | authorizer_challenge); | |
652 | if (is_valid) { | |
653 | auth_handler.handle_authentication(name, caps_info); | |
654 | return 1; | |
655 | } | |
656 | if (!more && !was_challenge && auth_meta->authorizer_challenge) { | |
657 | logger().info("added challenge on {}", con); | |
658 | return 0; | |
659 | } else { | |
660 | logger().info("bad authorizer on {}", con); | |
661 | return -EACCES; | |
662 | } | |
663 | } | |
664 | ||
665 | auth::AuthClient::auth_request_t | |
666 | Client::get_auth_request(crimson::net::ConnectionRef con, | |
667 | AuthConnectionMetaRef auth_meta) | |
668 | { | |
669 | logger().info("get_auth_request(con={}, auth_method={})", | |
670 | con, auth_meta->auth_method); | |
671 | // connection to mon? | |
672 | if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) { | |
673 | auto found = std::find_if(pending_conns.begin(), pending_conns.end(), | |
674 | [peer_addr = con->get_peer_addr()](auto& mc) { | |
675 | return mc->is_my_peer(peer_addr); | |
676 | }); | |
677 | if (found == pending_conns.end()) { | |
678 | throw crimson::auth::error{"unknown connection"}; | |
679 | } | |
680 | return (*found)->get_auth_request(entity_name, want_keys); | |
681 | } else { | |
682 | // generate authorizer | |
683 | if (!active_con) { | |
684 | logger().error(" but no auth handler is set up"); | |
685 | throw crimson::auth::error("no auth available"); | |
686 | } | |
687 | auto authorizer = active_con->get_authorizer(con->get_peer_type()); | |
688 | if (!authorizer) { | |
689 | logger().error("failed to build_authorizer for type {}", | |
690 | ceph_entity_type_name(con->get_peer_type())); | |
691 | throw crimson::auth::error("unable to build auth"); | |
692 | } | |
693 | auth_meta->authorizer.reset(authorizer); | |
694 | auth_meta->auth_method = authorizer->protocol; | |
695 | vector<uint32_t> modes; | |
696 | auth_registry.get_supported_modes(con->get_peer_type(), | |
697 | auth_meta->auth_method, | |
698 | &modes); | |
699 | return {authorizer->protocol, modes, authorizer->bl}; | |
700 | } | |
701 | } | |
702 | ||
703 | ceph::bufferlist Client::handle_auth_reply_more(crimson::net::ConnectionRef conn, | |
704 | AuthConnectionMetaRef auth_meta, | |
705 | const bufferlist& bl) | |
706 | { | |
707 | if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) { | |
708 | auto found = std::find_if(pending_conns.begin(), pending_conns.end(), | |
709 | [peer_addr = conn->get_peer_addr()](auto& mc) { | |
710 | return mc->is_my_peer(peer_addr); | |
711 | }); | |
712 | if (found == pending_conns.end()) { | |
713 | throw crimson::auth::error{"unknown connection"}; | |
714 | } | |
715 | bufferlist reply; | |
716 | tie(auth_meta->session_key, auth_meta->connection_secret, reply) = | |
717 | (*found)->handle_auth_reply_more(bl); | |
718 | return reply; | |
719 | } else { | |
720 | // authorizer challenges | |
721 | if (!active_con || !auth_meta->authorizer) { | |
722 | logger().error("no authorizer?"); | |
723 | throw crimson::auth::error("no auth available"); | |
724 | } | |
725 | auth_meta->authorizer->add_challenge(&cct, bl); | |
726 | return auth_meta->authorizer->bl; | |
727 | } | |
728 | } | |
729 | ||
730 | int Client::handle_auth_done(crimson::net::ConnectionRef conn, | |
731 | AuthConnectionMetaRef auth_meta, | |
732 | uint64_t global_id, | |
733 | uint32_t con_mode, | |
734 | const bufferlist& bl) | |
735 | { | |
736 | if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) { | |
737 | auto found = std::find_if(pending_conns.begin(), pending_conns.end(), | |
738 | [peer_addr = conn->get_peer_addr()](auto& mc) { | |
739 | return mc->is_my_peer(peer_addr); | |
740 | }); | |
741 | if (found == pending_conns.end()) { | |
742 | return -ENOENT; | |
743 | } | |
744 | int r = 0; | |
745 | tie(auth_meta->session_key, auth_meta->connection_secret, r) = | |
746 | (*found)->handle_auth_done(global_id, bl); | |
747 | return r; | |
748 | } else { | |
749 | // verify authorizer reply | |
750 | auto p = bl.begin(); | |
751 | if (!auth_meta->authorizer->verify_reply(p, &auth_meta->connection_secret)) { | |
752 | logger().error("failed verifying authorizer reply"); | |
753 | return -EACCES; | |
754 | } | |
755 | auth_meta->session_key = auth_meta->authorizer->session_key; | |
756 | return 0; | |
757 | } | |
758 | } | |
759 | ||
760 | // Handle server's indication that the previous auth attempt failed | |
761 | int Client::handle_auth_bad_method(crimson::net::ConnectionRef conn, | |
762 | AuthConnectionMetaRef auth_meta, | |
763 | uint32_t old_auth_method, | |
764 | int result, | |
765 | const std::vector<uint32_t>& allowed_methods, | |
766 | const std::vector<uint32_t>& allowed_modes) | |
767 | { | |
768 | if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) { | |
769 | auto found = std::find_if(pending_conns.begin(), pending_conns.end(), | |
770 | [peer_addr = conn->get_peer_addr()](auto& mc) { | |
771 | return mc->is_my_peer(peer_addr); | |
772 | }); | |
773 | if (found != pending_conns.end()) { | |
774 | return (*found)->handle_auth_bad_method( | |
775 | old_auth_method, result, | |
776 | allowed_methods, allowed_modes); | |
777 | } else { | |
778 | return -ENOENT; | |
779 | } | |
780 | } else { | |
781 | // huh... | |
782 | logger().info("hmm, they didn't like {} result {}", | |
783 | old_auth_method, cpp_strerror(result)); | |
784 | return -EACCES; | |
785 | } | |
786 | } | |
787 | ||
f67539c2 | 788 | seastar::future<> Client::handle_monmap(crimson::net::ConnectionRef conn, |
11fdf7f2 TL |
789 | Ref<MMonMap> m) |
790 | { | |
791 | monmap.decode(m->monmapbl); | |
792 | const auto peer_addr = conn->get_peer_addr(); | |
793 | auto cur_mon = monmap.get_name(peer_addr); | |
794 | logger().info("got monmap {}, mon.{}, is now rank {}", | |
795 | monmap.epoch, cur_mon, monmap.get_rank(cur_mon)); | |
796 | sub.got("monmap", monmap.get_epoch()); | |
797 | ||
798 | if (monmap.get_addr_name(peer_addr, cur_mon)) { | |
9f95a23c TL |
799 | if (active_con) { |
800 | logger().info("handle_monmap: renewing tickets"); | |
801 | return seastar::when_all_succeed( | |
802 | active_con->renew_tickets(), | |
f67539c2 | 803 | active_con->renew_rotating_keyring()).then_unpack([](){ |
9f95a23c TL |
804 | logger().info("handle_mon_map: renewed tickets"); |
805 | }); | |
806 | } else { | |
807 | return seastar::now(); | |
808 | } | |
11fdf7f2 TL |
809 | } else { |
810 | logger().warn("mon.{} went away", cur_mon); | |
f67539c2 TL |
811 | return reopen_session(-1).then([this] { |
812 | send_pendings(); | |
813 | return seastar::now(); | |
814 | }); | |
11fdf7f2 TL |
815 | } |
816 | } | |
817 | ||
f67539c2 TL |
818 | seastar::future<> Client::handle_auth_reply(crimson::net::ConnectionRef conn, |
819 | Ref<MAuthReply> m) | |
11fdf7f2 | 820 | { |
9f95a23c TL |
821 | logger().info( |
822 | "handle_auth_reply mon {} => {} returns {}: {}", | |
823 | conn->get_messenger()->get_myaddr(), | |
824 | conn->get_peer_addr(), *m, m->result); | |
11fdf7f2 TL |
825 | auto found = std::find_if(pending_conns.begin(), pending_conns.end(), |
826 | [peer_addr = conn->get_peer_addr()](auto& mc) { | |
9f95a23c | 827 | return mc->is_my_peer(peer_addr); |
11fdf7f2 TL |
828 | }); |
829 | if (found != pending_conns.end()) { | |
9f95a23c | 830 | return (*found)->handle_auth_reply(m); |
11fdf7f2 TL |
831 | } else if (active_con) { |
832 | return active_con->handle_auth_reply(m); | |
833 | } else { | |
834 | logger().error("unknown auth reply from {}", conn->get_peer_addr()); | |
835 | return seastar::now(); | |
836 | } | |
837 | } | |
838 | ||
839 | seastar::future<> Client::handle_subscribe_ack(Ref<MMonSubscribeAck> m) | |
840 | { | |
841 | sub.acked(m->interval); | |
842 | return seastar::now(); | |
843 | } | |
844 | ||
845 | Client::get_version_t Client::get_version(const std::string& map) | |
846 | { | |
847 | auto m = make_message<MMonGetVersion>(); | |
848 | auto tid = ++last_version_req_id; | |
849 | m->handle = tid; | |
850 | m->what = map; | |
851 | auto& req = version_reqs[tid]; | |
f67539c2 | 852 | return send_message(m).then([&req] { |
11fdf7f2 TL |
853 | return req.get_future(); |
854 | }); | |
855 | } | |
856 | ||
857 | seastar::future<> | |
858 | Client::handle_get_version_reply(Ref<MMonGetVersionReply> m) | |
859 | { | |
860 | if (auto found = version_reqs.find(m->handle); | |
861 | found != version_reqs.end()) { | |
862 | auto& result = found->second; | |
863 | logger().trace("{}: {} returns {}", | |
864 | __func__, m->handle, m->version); | |
f67539c2 | 865 | result.set_value(std::make_tuple(m->version, m->oldest_version)); |
11fdf7f2 TL |
866 | version_reqs.erase(found); |
867 | } else { | |
868 | logger().warn("{}: version request with handle {} not found", | |
869 | __func__, m->handle); | |
870 | } | |
871 | return seastar::now(); | |
872 | } | |
873 | ||
874 | seastar::future<> Client::handle_mon_command_ack(Ref<MMonCommandAck> m) | |
875 | { | |
876 | const auto tid = m->get_tid(); | |
877 | if (auto found = mon_commands.find(tid); | |
878 | found != mon_commands.end()) { | |
879 | auto& result = found->second; | |
880 | logger().trace("{} {}", __func__, tid); | |
f67539c2 | 881 | result.set_value(std::make_tuple(m->r, m->rs, std::move(m->get_data()))); |
11fdf7f2 TL |
882 | mon_commands.erase(found); |
883 | } else { | |
884 | logger().warn("{} {} not found", __func__, tid); | |
885 | } | |
886 | return seastar::now(); | |
887 | } | |
888 | ||
889 | seastar::future<> Client::handle_log_ack(Ref<MLogAck> m) | |
890 | { | |
891 | // XXX | |
892 | return seastar::now(); | |
893 | } | |
894 | ||
895 | seastar::future<> Client::handle_config(Ref<MConfig> m) | |
896 | { | |
9f95a23c | 897 | return crimson::common::local_conf().set_mon_vals(m->config); |
11fdf7f2 TL |
898 | } |
899 | ||
900 | std::vector<unsigned> Client::get_random_mons(unsigned n) const | |
901 | { | |
902 | uint16_t min_priority = std::numeric_limits<uint16_t>::max(); | |
903 | for (const auto& m : monmap.mon_info) { | |
904 | if (m.second.priority < min_priority) { | |
905 | min_priority = m.second.priority; | |
906 | } | |
907 | } | |
908 | vector<unsigned> ranks; | |
909 | for (auto [name, info] : monmap.mon_info) { | |
11fdf7f2 TL |
910 | if (info.priority == min_priority) { |
911 | ranks.push_back(monmap.get_rank(name)); | |
912 | } | |
913 | } | |
914 | std::random_device rd; | |
915 | std::default_random_engine rng{rd()}; | |
916 | std::shuffle(ranks.begin(), ranks.end(), rng); | |
917 | if (n == 0 || n > ranks.size()) { | |
918 | return ranks; | |
919 | } else { | |
920 | return {ranks.begin(), ranks.begin() + n}; | |
921 | } | |
922 | } | |
923 | ||
924 | seastar::future<> Client::authenticate() | |
925 | { | |
f67539c2 TL |
926 | return reopen_session(-1).then([this] { |
927 | send_pendings(); | |
928 | return seastar::now(); | |
929 | }); | |
11fdf7f2 TL |
930 | } |
931 | ||
932 | seastar::future<> Client::stop() | |
933 | { | |
f67539c2 TL |
934 | logger().info("{}", __func__); |
935 | auto fut = gate.close(); | |
936 | timer.cancel(); | |
937 | for (auto& pending_con : pending_conns) { | |
938 | pending_con->close(); | |
939 | } | |
940 | if (active_con) { | |
941 | active_con->close(); | |
942 | } | |
943 | return fut; | |
11fdf7f2 TL |
944 | } |
945 | ||
946 | seastar::future<> Client::reopen_session(int rank) | |
947 | { | |
9f95a23c | 948 | logger().info("{} to mon.{}", __func__, rank); |
11fdf7f2 TL |
949 | vector<unsigned> mons; |
950 | if (rank >= 0) { | |
951 | mons.push_back(rank); | |
952 | } else { | |
953 | const auto parallel = | |
9f95a23c | 954 | crimson::common::local_conf().get_val<uint64_t>("mon_client_hunt_parallel"); |
11fdf7f2 TL |
955 | mons = get_random_mons(parallel); |
956 | } | |
957 | pending_conns.reserve(mons.size()); | |
958 | return seastar::parallel_for_each(mons, [this](auto rank) { | |
f67539c2 TL |
959 | // TODO: connect to multiple addrs |
960 | auto peer = monmap.get_addrs(rank).pick_addr(msgr.get_myaddr().get_type()); | |
961 | if (peer == entity_addr_t{}) { | |
962 | // crimson msgr only uses the first bound addr | |
963 | logger().warn("mon.{} does not have an addr compatible with me", rank); | |
964 | return seastar::now(); | |
965 | } | |
11fdf7f2 | 966 | logger().info("connecting to mon.{}", rank); |
f67539c2 TL |
967 | return seastar::futurize_invoke( |
968 | [peer, this] () -> seastar::future<Connection::auth_result_t> { | |
9f95a23c TL |
969 | auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON); |
970 | auto& mc = pending_conns.emplace_back( | |
971 | std::make_unique<Connection>(auth_registry, conn, &keyring)); | |
972 | if (conn->get_peer_addr().is_msgr2()) { | |
973 | return mc->authenticate_v2(); | |
974 | } else { | |
975 | return mc->authenticate_v1(monmap.get_epoch(), entity_name, want_keys) | |
976 | .handle_exception([conn](auto ep) { | |
f67539c2 TL |
977 | conn->mark_down(); |
978 | return seastar::make_exception_future<Connection::auth_result_t>(ep); | |
9f95a23c TL |
979 | }); |
980 | } | |
981 | }).then([peer, this](auto result) { | |
f67539c2 TL |
982 | if (result == Connection::auth_result_t::success) { |
983 | _finish_auth(peer); | |
9f95a23c | 984 | } |
9f95a23c TL |
985 | logger().debug("reopen_session mon connection attempts complete"); |
986 | }).handle_exception([](auto ep) { | |
987 | logger().error("mon connections failed with ep {}", ep); | |
988 | return seastar::make_exception_future(ep); | |
11fdf7f2 TL |
989 | }); |
990 | }).then([this] { | |
f67539c2 TL |
991 | if (!active_con) { |
992 | return seastar::make_exception_future( | |
993 | crimson::common::system_shutdown_exception()); | |
994 | } | |
9f95a23c | 995 | return active_con->renew_rotating_keyring(); |
11fdf7f2 TL |
996 | }); |
997 | } | |
998 | ||
f67539c2 TL |
999 | void Client::_finish_auth(const entity_addr_t& peer) |
1000 | { | |
1001 | if (!is_hunting()) { | |
1002 | return; | |
1003 | } | |
1004 | logger().info("found mon.{}", monmap.get_name(peer)); | |
1005 | ||
1006 | auto found = std::find_if( | |
1007 | pending_conns.begin(), pending_conns.end(), | |
1008 | [peer](auto& conn) { | |
1009 | return conn->is_my_peer(peer); | |
1010 | }); | |
1011 | if (found == pending_conns.end()) { | |
1012 | // Happens if another connection has won the race | |
1013 | ceph_assert(active_con && pending_conns.empty()); | |
1014 | logger().info("no pending connection for mon.{}, peer {}", | |
1015 | monmap.get_name(peer), peer); | |
1016 | return; | |
1017 | } | |
1018 | ||
1019 | ceph_assert(!active_con && !pending_conns.empty()); | |
1020 | active_con = std::move(*found); | |
1021 | found->reset(); | |
1022 | for (auto& conn : pending_conns) { | |
1023 | if (conn) { | |
1024 | conn->close(); | |
1025 | } | |
1026 | } | |
1027 | pending_conns.clear(); | |
1028 | } | |
1029 | ||
11fdf7f2 TL |
1030 | Client::command_result_t |
1031 | Client::run_command(const std::vector<std::string>& cmd, | |
1032 | const bufferlist& bl) | |
1033 | { | |
1034 | auto m = make_message<MMonCommand>(monmap.fsid); | |
1035 | auto tid = ++last_mon_command_id; | |
1036 | m->set_tid(tid); | |
1037 | m->cmd = cmd; | |
1038 | m->set_data(bl); | |
1039 | auto& req = mon_commands[tid]; | |
f67539c2 | 1040 | return send_message(m).then([&req] { |
11fdf7f2 TL |
1041 | return req.get_future(); |
1042 | }); | |
1043 | } | |
1044 | ||
1045 | seastar::future<> Client::send_message(MessageRef m) | |
1046 | { | |
f67539c2 TL |
1047 | if (active_con) { |
1048 | if (!pending_messages.empty()) { | |
1049 | send_pendings(); | |
1050 | } | |
1051 | return active_con->get_conn()->send(m); | |
1052 | } | |
1053 | auto& delayed = pending_messages.emplace_back(m); | |
1054 | return delayed.pr.get_future(); | |
1055 | } | |
1056 | ||
1057 | void Client::send_pendings() | |
1058 | { | |
1059 | if (active_con) { | |
1060 | for (auto& m : pending_messages) { | |
1061 | (void) active_con->get_conn()->send(m.msg); | |
1062 | m.pr.set_value(); | |
1063 | } | |
1064 | pending_messages.clear(); | |
1065 | } | |
11fdf7f2 TL |
1066 | } |
1067 | ||
1068 | bool Client::sub_want(const std::string& what, version_t start, unsigned flags) | |
1069 | { | |
1070 | return sub.want(what, start, flags); | |
1071 | } | |
1072 | ||
1073 | void Client::sub_got(const std::string& what, version_t have) | |
1074 | { | |
1075 | sub.got(what, have); | |
1076 | } | |
1077 | ||
1078 | void Client::sub_unwant(const std::string& what) | |
1079 | { | |
1080 | sub.unwant(what); | |
1081 | } | |
1082 | ||
1083 | bool Client::sub_want_increment(const std::string& what, | |
1084 | version_t start, | |
1085 | unsigned flags) | |
1086 | { | |
1087 | return sub.inc_want(what, start, flags); | |
1088 | } | |
1089 | ||
1090 | seastar::future<> Client::renew_subs() | |
1091 | { | |
1092 | if (!sub.have_new()) { | |
1093 | logger().warn("{} - empty", __func__); | |
1094 | return seastar::now(); | |
1095 | } | |
1096 | logger().trace("{}", __func__); | |
1097 | ||
1098 | auto m = make_message<MMonSubscribe>(); | |
1099 | m->what = sub.get_subs(); | |
1100 | m->hostname = ceph_get_short_hostname(); | |
f67539c2 | 1101 | return send_message(m).then([this] { |
11fdf7f2 TL |
1102 | sub.renewed(); |
1103 | }); | |
1104 | } | |
1105 | ||
f67539c2 TL |
1106 | void Client::print(std::ostream& out) const |
1107 | { | |
1108 | out << "mon." << entity_name; | |
1109 | } | |
1110 | ||
9f95a23c | 1111 | } // namespace crimson::mon |