]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonClient.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / mon / MonClient.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #ifndef CEPH_MONCLIENT_H
15 #define CEPH_MONCLIENT_H
16
17 #include <functional>
18 #include <list>
19 #include <map>
20 #include <memory>
21 #include <set>
22 #include <string>
23 #include <vector>
24
25 #include "msg/Messenger.h"
26
27 #include "MonMap.h"
28 #include "MonSub.h"
29
30 #include "common/Timer.h"
31 #include "common/Finisher.h"
32 #include "common/config.h"
33
34 #include "auth/AuthClient.h"
35 #include "auth/AuthServer.h"
36
37 class MMonMap;
38 class MConfig;
39 class MMonGetVersionReply;
40 struct MMonSubscribeAck;
41 class MMonCommandAck;
42 struct MAuthReply;
43 class LogClient;
44 class AuthAuthorizer;
45 class AuthClientHandler;
46 class AuthRegistry;
47 class KeyRing;
48 class RotatingKeyRing;
49
50 class MonConnection {
51 public:
52 MonConnection(CephContext *cct,
53 ConnectionRef conn,
54 uint64_t global_id,
55 AuthRegistry *auth_registry);
56 ~MonConnection();
57 MonConnection(MonConnection&& rhs) = default;
58 MonConnection& operator=(MonConnection&&) = default;
59 MonConnection(const MonConnection& rhs) = delete;
60 MonConnection& operator=(const MonConnection&) = delete;
61 int handle_auth(MAuthReply *m,
62 const EntityName& entity_name,
63 uint32_t want_keys,
64 RotatingKeyRing* keyring);
65 int authenticate(MAuthReply *m);
66 void start(epoch_t epoch,
67 const EntityName& entity_name);
68 bool have_session() const;
69 uint64_t get_global_id() const {
70 return global_id;
71 }
72 ConnectionRef get_con() {
73 return con;
74 }
75 std::unique_ptr<AuthClientHandler>& get_auth() {
76 return auth;
77 }
78
79 int get_auth_request(
80 uint32_t *method,
81 std::vector<uint32_t> *preferred_modes,
82 ceph::buffer::list *out,
83 const EntityName& entity_name,
84 uint32_t want_keys,
85 RotatingKeyRing* keyring);
86 int handle_auth_reply_more(
87 AuthConnectionMeta *auth_meta,
88 const ceph::buffer::list& bl,
89 ceph::buffer::list *reply);
90 int handle_auth_done(
91 AuthConnectionMeta *auth_meta,
92 uint64_t global_id,
93 const ceph::buffer::list& bl,
94 CryptoKey *session_key,
95 std::string *connection_secret);
96 int handle_auth_bad_method(
97 uint32_t old_auth_method,
98 int result,
99 const std::vector<uint32_t>& allowed_methods,
100 const std::vector<uint32_t>& allowed_modes);
101
102 bool is_con(Connection *c) const {
103 return con.get() == c;
104 }
105 void queue_command(Message *m) {
106 pending_tell_command = m;
107 }
108
109 private:
110 int _negotiate(MAuthReply *m,
111 const EntityName& entity_name,
112 uint32_t want_keys,
113 RotatingKeyRing* keyring);
114 int _init_auth(uint32_t method,
115 const EntityName& entity_name,
116 uint32_t want_keys,
117 RotatingKeyRing* keyring,
118 bool msgr2);
119
120 private:
121 CephContext *cct;
122 enum class State {
123 NONE,
124 NEGOTIATING, // v1 only
125 AUTHENTICATING, // v1 and v2
126 HAVE_SESSION,
127 };
128 State state = State::NONE;
129 ConnectionRef con;
130 int auth_method = -1;
131 utime_t auth_start;
132
133 std::unique_ptr<AuthClientHandler> auth;
134 uint64_t global_id;
135
136 MessageRef pending_tell_command;
137
138 AuthRegistry *auth_registry;
139 };
140
141
142 struct MonClientPinger : public Dispatcher,
143 public AuthClient {
144
145 ceph::mutex lock = ceph::make_mutex("MonClientPinger::lock");
146 ceph::condition_variable ping_recvd_cond;
147 std::string *result;
148 bool done;
149 RotatingKeyRing *keyring;
150 std::unique_ptr<MonConnection> mc;
151
152 MonClientPinger(CephContext *cct_,
153 RotatingKeyRing *keyring,
154 std::string *res_) :
155 Dispatcher(cct_),
156 result(res_),
157 done(false),
158 keyring(keyring)
159 { }
160
161 int wait_for_reply(double timeout = 0.0) {
162 std::unique_lock locker{lock};
163 if (timeout <= 0) {
164 timeout = cct->_conf->client_mount_timeout;
165 }
166 done = false;
167 if (ping_recvd_cond.wait_for(locker,
168 ceph::make_timespan(timeout),
169 [this] { return done; })) {
170 return 0;
171 } else {
172 return ETIMEDOUT;
173 }
174 }
175
176 bool ms_dispatch(Message *m) override {
177 using ceph::decode;
178 std::lock_guard l(lock);
179 if (m->get_type() != CEPH_MSG_PING)
180 return false;
181
182 ceph::buffer::list &payload = m->get_payload();
183 if (result && payload.length() > 0) {
184 auto p = std::cbegin(payload);
185 decode(*result, p);
186 }
187 done = true;
188 ping_recvd_cond.notify_all();
189 m->put();
190 return true;
191 }
192 bool ms_handle_reset(Connection *con) override {
193 std::lock_guard l(lock);
194 done = true;
195 ping_recvd_cond.notify_all();
196 return true;
197 }
198 void ms_handle_remote_reset(Connection *con) override {}
199 bool ms_handle_refused(Connection *con) override {
200 return false;
201 }
202
203 // AuthClient
204 int get_auth_request(
205 Connection *con,
206 AuthConnectionMeta *auth_meta,
207 uint32_t *auth_method,
208 std::vector<uint32_t> *preferred_modes,
209 ceph::buffer::list *bl) override {
210 return mc->get_auth_request(auth_method, preferred_modes, bl,
211 cct->_conf->name, 0, keyring);
212 }
213 int handle_auth_reply_more(
214 Connection *con,
215 AuthConnectionMeta *auth_meta,
216 const ceph::buffer::list& bl,
217 ceph::buffer::list *reply) override {
218 return mc->handle_auth_reply_more(auth_meta, bl, reply);
219 }
220 int handle_auth_done(
221 Connection *con,
222 AuthConnectionMeta *auth_meta,
223 uint64_t global_id,
224 uint32_t con_mode,
225 const ceph::buffer::list& bl,
226 CryptoKey *session_key,
227 std::string *connection_secret) override {
228 return mc->handle_auth_done(auth_meta, global_id, bl,
229 session_key, connection_secret);
230 }
231 int handle_auth_bad_method(
232 Connection *con,
233 AuthConnectionMeta *auth_meta,
234 uint32_t old_auth_method,
235 int result,
236 const std::vector<uint32_t>& allowed_methods,
237 const std::vector<uint32_t>& allowed_modes) override {
238 return mc->handle_auth_bad_method(old_auth_method, result,
239 allowed_methods, allowed_modes);
240 }
241 };
242
243
244 class MonClient : public Dispatcher,
245 public AuthClient,
246 public AuthServer /* for mgr, osd, mds */ {
247 public:
248 MonMap monmap;
249 std::map<std::string,std::string> config_mgr;
250
251 private:
252 Messenger *messenger;
253
254 std::unique_ptr<MonConnection> active_con;
255 std::map<entity_addrvec_t, MonConnection> pending_cons;
256 std::set<unsigned> tried;
257
258 EntityName entity_name;
259
260 mutable ceph::mutex monc_lock = ceph::make_mutex("MonClient::monc_lock");
261 SafeTimer timer;
262 Finisher finisher;
263
264 bool initialized;
265 bool stopping = false;
266
267 LogClient *log_client;
268 bool more_log_pending;
269
270 void send_log(bool flush = false);
271
272 bool ms_dispatch(Message *m) override;
273 bool ms_handle_reset(Connection *con) override;
274 void ms_handle_remote_reset(Connection *con) override {}
275 bool ms_handle_refused(Connection *con) override { return false; }
276
277 void handle_monmap(MMonMap *m);
278 void handle_config(MConfig *m);
279
280 void handle_auth(MAuthReply *m);
281
282 // monitor session
283 utime_t last_keepalive;
284 utime_t last_send_log;
285
286 void tick();
287 void schedule_tick();
288
289 // monclient
290 bool want_monmap;
291 ceph::condition_variable map_cond;
292 bool passthrough_monmap = false;
293 bool got_config = false;
294
295 // authenticate
296 std::unique_ptr<AuthClientHandler> auth;
297 uint32_t want_keys = 0;
298 uint64_t global_id = 0;
299 ceph::condition_variable auth_cond;
300 int authenticate_err = 0;
301 bool authenticated = false;
302
303 std::list<MessageRef> waiting_for_session;
304 utime_t last_rotating_renew_sent;
305 std::unique_ptr<Context> session_established_context;
306 bool had_a_connection;
307 double reopen_interval_multiplier;
308
309 Dispatcher *handle_authentication_dispatcher = nullptr;
310
311 bool _opened() const;
312 bool _hunting() const;
313 void _start_hunting();
314 void _finish_hunting(int auth_err);
315 void _finish_auth(int auth_err);
316 void _reopen_session(int rank = -1);
317 MonConnection& _add_conn(unsigned rank, uint64_t global_id);
318 void _un_backoff();
319 void _add_conns(uint64_t global_id);
320 void _send_mon_message(MessageRef m);
321
322 std::map<entity_addrvec_t, MonConnection>::iterator _find_pending_con(
323 const ConnectionRef& con) {
324 for (auto i = pending_cons.begin(); i != pending_cons.end(); ++i) {
325 if (i->second.get_con() == con) {
326 return i;
327 }
328 }
329 return pending_cons.end();
330 }
331
332 public:
333 // AuthClient
334 int get_auth_request(
335 Connection *con,
336 AuthConnectionMeta *auth_meta,
337 uint32_t *method,
338 std::vector<uint32_t> *preferred_modes,
339 ceph::buffer::list *bl) override;
340 int handle_auth_reply_more(
341 Connection *con,
342 AuthConnectionMeta *auth_meta,
343 const ceph::buffer::list& bl,
344 ceph::buffer::list *reply) override;
345 int handle_auth_done(
346 Connection *con,
347 AuthConnectionMeta *auth_meta,
348 uint64_t global_id,
349 uint32_t con_mode,
350 const ceph::buffer::list& bl,
351 CryptoKey *session_key,
352 std::string *connection_secret) override;
353 int handle_auth_bad_method(
354 Connection *con,
355 AuthConnectionMeta *auth_meta,
356 uint32_t old_auth_method,
357 int result,
358 const std::vector<uint32_t>& allowed_methods,
359 const std::vector<uint32_t>& allowed_modes) override;
360 // AuthServer
361 int handle_auth_request(
362 Connection *con,
363 AuthConnectionMeta *auth_meta,
364 bool more,
365 uint32_t auth_method,
366 const ceph::buffer::list& bl,
367 ceph::buffer::list *reply);
368
369 void set_entity_name(EntityName name) { entity_name = name; }
370 void set_handle_authentication_dispatcher(Dispatcher *d) {
371 handle_authentication_dispatcher = d;
372 }
373 int _check_auth_tickets();
374 int _check_auth_rotating();
375 int wait_auth_rotating(double timeout);
376
377 int authenticate(double timeout=0.0);
378 bool is_authenticated() const {return authenticated;}
379
380 bool is_connected() const { return active_con != nullptr; }
381
382 /**
383 * Try to flush as many log messages as we can in a single
384 * message. Use this before shutting down to transmit your
385 * last message.
386 */
387 void flush_log();
388
389 private:
390 // mon subscriptions
391 MonSub sub;
392 void _renew_subs();
393 void handle_subscribe_ack(MMonSubscribeAck* m);
394
395 public:
396 void renew_subs() {
397 std::lock_guard l(monc_lock);
398 _renew_subs();
399 }
400 bool sub_want(std::string what, version_t start, unsigned flags) {
401 std::lock_guard l(monc_lock);
402 return sub.want(what, start, flags);
403 }
404 void sub_got(std::string what, version_t have) {
405 std::lock_guard l(monc_lock);
406 sub.got(what, have);
407 }
408 void sub_unwant(std::string what) {
409 std::lock_guard l(monc_lock);
410 sub.unwant(what);
411 }
412 bool sub_want_increment(std::string what, version_t start, unsigned flags) {
413 std::lock_guard l(monc_lock);
414 return sub.inc_want(what, start, flags);
415 }
416
417 std::unique_ptr<KeyRing> keyring;
418 std::unique_ptr<RotatingKeyRing> rotating_secrets;
419
420 public:
421 explicit MonClient(CephContext *cct_);
422 MonClient(const MonClient &) = delete;
423 MonClient& operator=(const MonClient &) = delete;
424 ~MonClient() override;
425
426 int init();
427 void shutdown();
428
429 void set_log_client(LogClient *clog) {
430 log_client = clog;
431 }
432 LogClient *get_log_client() {
433 return log_client;
434 }
435
436 int build_initial_monmap();
437 int get_monmap();
438 int get_monmap_and_config();
439 /**
440 * If you want to see MonMap messages, set this and
441 * the MonClient will tell the Messenger it hasn't
442 * dealt with it.
443 * Note that if you do this, *you* are of course responsible for
444 * putting the message reference!
445 */
446 void set_passthrough_monmap() {
447 std::lock_guard l(monc_lock);
448 passthrough_monmap = true;
449 }
450 void unset_passthrough_monmap() {
451 std::lock_guard l(monc_lock);
452 passthrough_monmap = false;
453 }
454 /**
455 * Ping monitor with ID @p mon_id and record the resulting
456 * reply in @p result_reply.
457 *
458 * @param[in] mon_id Target monitor's ID
459 * @param[out] result_reply reply from mon.ID, if param != NULL
460 * @returns 0 in case of success; < 0 in case of error,
461 * -ETIMEDOUT if monitor didn't reply before timeout
462 * expired (default: conf->client_mount_timeout).
463 */
464 int ping_monitor(const std::string &mon_id, std::string *result_reply);
465
466 void send_mon_message(Message *m) {
467 send_mon_message(MessageRef{m, false});
468 }
469 void send_mon_message(MessageRef m);
470 /**
471 * If you specify a callback, you should not call
472 * reopen_session() again until it has been triggered. The MonClient
473 * will behave, but the first callback could be triggered after
474 * the session has been killed and the MonClient has started trying
475 * to reconnect to another monitor.
476 */
477 void reopen_session(Context *cb=NULL) {
478 std::lock_guard l(monc_lock);
479 if (cb) {
480 session_established_context.reset(cb);
481 }
482 _reopen_session();
483 }
484
485 const uuid_d& get_fsid() const {
486 return monmap.fsid;
487 }
488
489 entity_addrvec_t get_mon_addrs(unsigned i) const {
490 std::lock_guard l(monc_lock);
491 if (i < monmap.size())
492 return monmap.get_addrs(i);
493 return entity_addrvec_t();
494 }
495 int get_num_mon() const {
496 std::lock_guard l(monc_lock);
497 return monmap.size();
498 }
499
500 uint64_t get_global_id() const {
501 std::lock_guard l(monc_lock);
502 return global_id;
503 }
504
505 void set_messenger(Messenger *m) { messenger = m; }
506 entity_addrvec_t get_myaddrs() const { return messenger->get_myaddrs(); }
507 AuthAuthorizer* build_authorizer(int service_id) const;
508
509 void set_want_keys(uint32_t want) {
510 want_keys = want;
511 }
512
513 // admin commands
514 private:
515 uint64_t last_mon_command_tid;
516
517 struct MonCommand {
518 // for tell only
519 std::string target_name;
520 int target_rank;
521 ConnectionRef target_con;
522 std::unique_ptr<MonConnection> target_session;
523 unsigned send_attempts = 0; ///< attempt count for legacy mons
524 utime_t last_send_attempt;
525
526 uint64_t tid;
527 std::vector<std::string> cmd;
528 ceph::buffer::list inbl;
529 ceph::buffer::list *poutbl;
530 std::string *prs;
531 int *prval;
532 Context *onfinish, *ontimeout;
533
534 explicit MonCommand(uint64_t t)
535 : target_rank(-1),
536 tid(t),
537 poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL), ontimeout(NULL)
538 {}
539
540 bool is_tell() const {
541 return target_name.size() || target_rank >= 0;
542 }
543 };
544 std::map<uint64_t,MonCommand*> mon_commands;
545
546 void _send_command(MonCommand *r);
547 void _check_tell_commands();
548 void _resend_mon_commands();
549 int _cancel_mon_command(uint64_t tid);
550 void _finish_command(MonCommand *r, int ret, std::string rs);
551 void _finish_auth();
552 void handle_mon_command_ack(MMonCommandAck *ack);
553 void handle_command_reply(MCommandReply *reply);
554
555 public:
556 void start_mon_command(const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,
557 ceph::buffer::list *outbl, std::string *outs,
558 Context *onfinish);
559 void start_mon_command(int mon_rank,
560 const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,
561 ceph::buffer::list *outbl, std::string *outs,
562 Context *onfinish);
563 void start_mon_command(const std::string &mon_name, ///< mon name, with mon. prefix
564 const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,
565 ceph::buffer::list *outbl, std::string *outs,
566 Context *onfinish);
567
568 // version requests
569 public:
570 /**
571 * get latest known version(s) of cluster map
572 *
573 * @param map std::string name of map (e.g., 'osdmap')
574 * @param newest pointer where newest map version will be stored
575 * @param oldest pointer where oldest map version will be stored
576 * @param onfinish context that will be triggered on completion
577 * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
578 */
579 void get_version(std::string map, version_t *newest, version_t *oldest, Context *onfinish);
580 /**
581 * Run a callback within our lock, with a reference
582 * to the MonMap
583 */
584 template<typename Callback, typename...Args>
585 auto with_monmap(Callback&& cb, Args&&...args) const ->
586 decltype(cb(monmap, std::forward<Args>(args)...)) {
587 std::lock_guard l(monc_lock);
588 return std::forward<Callback>(cb)(monmap, std::forward<Args>(args)...);
589 }
590
591 void register_config_callback(md_config_t::config_callback fn);
592 void register_config_notify_callback(std::function<void(void)> f) {
593 config_notify_cb = f;
594 }
595 md_config_t::config_callback get_config_callback();
596
597 private:
598 struct version_req_d {
599 Context *context;
600 version_t *newest, *oldest;
601 version_req_d(Context *con, version_t *n, version_t *o) : context(con),newest(n), oldest(o) {}
602 };
603
604 std::map<ceph_tid_t, version_req_d*> version_requests;
605 ceph_tid_t version_req_id;
606 void handle_get_version_reply(MMonGetVersionReply* m);
607
608 md_config_t::config_callback config_cb;
609 std::function<void(void)> config_notify_cb;
610 };
611
612 #endif