]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonClient.h
update sources to v12.1.0
[ceph.git] / ceph / src / mon / MonClient.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_MONCLIENT_H
16 #define CEPH_MONCLIENT_H
17
18 #include <memory>
19
20 #include "msg/Messenger.h"
21
22 #include "MonMap.h"
23
24 #include "common/Timer.h"
25 #include "common/Finisher.h"
26 #include "common/config.h"
27
28
29 class MMonMap;
30 class MMonGetVersionReply;
31 struct MMonSubscribeAck;
32 class MMonCommandAck;
33 struct MAuthReply;
34 class MAuthRotating;
35 class LogClient;
36 struct AuthAuthorizer;
37 class AuthMethodList;
38 class AuthClientHandler;
39 class KeyRing;
40 class RotatingKeyRing;
41
42 struct MonClientPinger : public Dispatcher {
43
44 Mutex lock;
45 Cond ping_recvd_cond;
46 string *result;
47 bool done;
48
49 MonClientPinger(CephContext *cct_, string *res_) :
50 Dispatcher(cct_),
51 lock("MonClientPinger::lock"),
52 result(res_),
53 done(false)
54 { }
55
56 int wait_for_reply(double timeout = 0.0) {
57 utime_t until = ceph_clock_now();
58 until += (timeout > 0 ? timeout : cct->_conf->client_mount_timeout);
59 done = false;
60
61 int ret = 0;
62 while (!done) {
63 ret = ping_recvd_cond.WaitUntil(lock, until);
64 if (ret == ETIMEDOUT)
65 break;
66 }
67 return ret;
68 }
69
70 bool ms_dispatch(Message *m) override {
71 Mutex::Locker l(lock);
72 if (m->get_type() != CEPH_MSG_PING)
73 return false;
74
75 bufferlist &payload = m->get_payload();
76 if (result && payload.length() > 0) {
77 bufferlist::iterator p = payload.begin();
78 ::decode(*result, p);
79 }
80 done = true;
81 ping_recvd_cond.SignalAll();
82 m->put();
83 return true;
84 }
85 bool ms_handle_reset(Connection *con) override {
86 Mutex::Locker l(lock);
87 done = true;
88 ping_recvd_cond.SignalAll();
89 return true;
90 }
91 void ms_handle_remote_reset(Connection *con) override {}
92 bool ms_handle_refused(Connection *con) override {
93 return false;
94 }
95 };
96
97 class MonConnection {
98 public:
99 MonConnection(CephContext *cct,
100 ConnectionRef conn,
101 uint64_t global_id);
102 ~MonConnection();
103 MonConnection(MonConnection&& rhs) = default;
104 MonConnection& operator=(MonConnection&&) = default;
105 MonConnection(const MonConnection& rhs) = delete;
106 MonConnection& operator=(const MonConnection&) = delete;
107 int handle_auth(MAuthReply *m,
108 const EntityName& entity_name,
109 uint32_t want_keys,
110 RotatingKeyRing* keyring);
111 int authenticate(MAuthReply *m);
112 void start(epoch_t epoch,
113 const EntityName& entity_name,
114 const AuthMethodList& auth_supported);
115 bool have_session() const;
116 uint64_t get_global_id() const {
117 return global_id;
118 }
119 ConnectionRef get_con() {
120 return con;
121 }
122 std::unique_ptr<AuthClientHandler>& get_auth() {
123 return auth;
124 }
125
126 private:
127 int _negotiate(MAuthReply *m,
128 const EntityName& entity_name,
129 uint32_t want_keys,
130 RotatingKeyRing* keyring);
131
132 private:
133 CephContext *cct;
134 enum class State {
135 NONE,
136 NEGOTIATING,
137 AUTHENTICATING,
138 HAVE_SESSION,
139 };
140 State state = State::NONE;
141 ConnectionRef con;
142
143 std::unique_ptr<AuthClientHandler> auth;
144 uint64_t global_id;
145 };
146
147 class MonClient : public Dispatcher {
148 public:
149 MonMap monmap;
150 private:
151 Messenger *messenger;
152
153 std::unique_ptr<MonConnection> active_con;
154 std::map<entity_addr_t, MonConnection> pending_cons;
155
156 EntityName entity_name;
157
158 entity_addr_t my_addr;
159
160 mutable Mutex monc_lock;
161 SafeTimer timer;
162 Finisher finisher;
163
164 bool initialized;
165 bool no_keyring_disabled_cephx;
166
167 LogClient *log_client;
168 bool more_log_pending;
169
170 void send_log(bool flush = false);
171
172 std::unique_ptr<AuthMethodList> auth_supported;
173
174 bool ms_dispatch(Message *m) override;
175 bool ms_handle_reset(Connection *con) override;
176 void ms_handle_remote_reset(Connection *con) override {}
177 bool ms_handle_refused(Connection *con) override { return false; }
178
179 void handle_monmap(MMonMap *m);
180
181 void handle_auth(MAuthReply *m);
182
183 // monitor session
184 void tick();
185 void schedule_tick();
186
187 // monclient
188 bool want_monmap;
189 Cond map_cond;
190 bool passthrough_monmap = false;
191
192 // authenticate
193 std::unique_ptr<AuthClientHandler> auth;
194 uint32_t want_keys = 0;
195 uint64_t global_id = 0;
196 Cond auth_cond;
197 int authenticate_err = 0;
198 bool authenticated = false;
199
200 list<Message*> waiting_for_session;
201 utime_t last_rotating_renew_sent;
202 std::unique_ptr<Context> session_established_context;
203 bool had_a_connection;
204 double reopen_interval_multiplier;
205
206 bool _opened() const;
207 bool _hunting() const;
208 void _start_hunting();
209 void _finish_hunting();
210 void _finish_auth(int auth_err);
211 void _reopen_session(int rank = -1);
212 MonConnection& _add_conn(unsigned rank, uint64_t global_id);
213 void _add_conns(uint64_t global_id);
214 void _send_mon_message(Message *m);
215
216 public:
217 void set_entity_name(EntityName name) { entity_name = name; }
218
219 int _check_auth_tickets();
220 int _check_auth_rotating();
221 int wait_auth_rotating(double timeout);
222
223 int authenticate(double timeout=0.0);
224 bool is_authenticated() const {return authenticated;}
225
226 /**
227 * Try to flush as many log messages as we can in a single
228 * message. Use this before shutting down to transmit your
229 * last message.
230 */
231 void flush_log();
232
233 // mon subscriptions
234 private:
235 map<string,ceph_mon_subscribe_item> sub_sent; // my subs, and current versions
236 map<string,ceph_mon_subscribe_item> sub_new; // unsent new subs
237 utime_t sub_renew_sent, sub_renew_after;
238
239 void _renew_subs();
240 void handle_subscribe_ack(MMonSubscribeAck* m);
241
242 bool _sub_want(const string &what, version_t start, unsigned flags) {
243 auto sub = sub_new.find(what);
244 if (sub != sub_new.end() &&
245 sub->second.start == start &&
246 sub->second.flags == flags) {
247 return false;
248 } else {
249 sub = sub_sent.find(what);
250 if (sub != sub_sent.end() &&
251 sub->second.start == start &&
252 sub->second.flags == flags)
253 return false;
254 }
255
256 sub_new[what].start = start;
257 sub_new[what].flags = flags;
258 return true;
259 }
260 void _sub_got(const string &what, version_t got) {
261 if (sub_new.count(what)) {
262 if (sub_new[what].start <= got) {
263 if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME)
264 sub_new.erase(what);
265 else
266 sub_new[what].start = got + 1;
267 }
268 } else if (sub_sent.count(what)) {
269 if (sub_sent[what].start <= got) {
270 if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME)
271 sub_sent.erase(what);
272 else
273 sub_sent[what].start = got + 1;
274 }
275 }
276 }
277 void _sub_unwant(const string &what) {
278 sub_sent.erase(what);
279 sub_new.erase(what);
280 }
281
282 public:
283 void renew_subs() {
284 Mutex::Locker l(monc_lock);
285 _renew_subs();
286 }
287 bool sub_want(string what, version_t start, unsigned flags) {
288 Mutex::Locker l(monc_lock);
289 return _sub_want(what, start, flags);
290 }
291 void sub_got(string what, version_t have) {
292 Mutex::Locker l(monc_lock);
293 _sub_got(what, have);
294 }
295 void sub_unwant(string what) {
296 Mutex::Locker l(monc_lock);
297 _sub_unwant(what);
298 }
299 /**
300 * Increase the requested subscription start point. If you do increase
301 * the value, apply the passed-in flags as well; otherwise do nothing.
302 */
303 bool sub_want_increment(string what, version_t start, unsigned flags) {
304 Mutex::Locker l(monc_lock);
305 map<string,ceph_mon_subscribe_item>::iterator i = sub_new.find(what);
306 if (i != sub_new.end()) {
307 if (i->second.start >= start)
308 return false;
309 i->second.start = start;
310 i->second.flags = flags;
311 return true;
312 }
313
314 i = sub_sent.find(what);
315 if (i == sub_sent.end() || i->second.start < start) {
316 ceph_mon_subscribe_item& item = sub_new[what];
317 item.start = start;
318 item.flags = flags;
319 return true;
320 }
321 return false;
322 }
323
324 std::unique_ptr<KeyRing> keyring;
325 std::unique_ptr<RotatingKeyRing> rotating_secrets;
326
327 public:
328 explicit MonClient(CephContext *cct_);
329 MonClient(const MonClient &) = delete;
330 MonClient& operator=(const MonClient &) = delete;
331 ~MonClient() override;
332
333 int init();
334 void shutdown();
335
336 void set_log_client(LogClient *clog) {
337 log_client = clog;
338 }
339
340 int build_initial_monmap();
341 int get_monmap();
342 int get_monmap_privately();
343 /**
344 * If you want to see MonMap messages, set this and
345 * the MonClient will tell the Messenger it hasn't
346 * dealt with it.
347 * Note that if you do this, *you* are of course responsible for
348 * putting the message reference!
349 */
350 void set_passthrough_monmap() {
351 Mutex::Locker l(monc_lock);
352 passthrough_monmap = true;
353 }
354 void unset_passthrough_monmap() {
355 Mutex::Locker l(monc_lock);
356 passthrough_monmap = false;
357 }
358 /**
359 * Ping monitor with ID @p mon_id and record the resulting
360 * reply in @p result_reply.
361 *
362 * @param[in] mon_id Target monitor's ID
363 * @param[out] result_reply reply from mon.ID, if param != NULL
364 * @returns 0 in case of success; < 0 in case of error,
365 * -ETIMEDOUT if monitor didn't reply before timeout
366 * expired (default: conf->client_mount_timeout).
367 */
368 int ping_monitor(const string &mon_id, string *result_reply);
369
370 void send_mon_message(Message *m) {
371 Mutex::Locker l(monc_lock);
372 _send_mon_message(m);
373 }
374 /**
375 * If you specify a callback, you should not call
376 * reopen_session() again until it has been triggered. The MonClient
377 * will behave, but the first callback could be triggered after
378 * the session has been killed and the MonClient has started trying
379 * to reconnect to another monitor.
380 */
381 void reopen_session(Context *cb=NULL) {
382 Mutex::Locker l(monc_lock);
383 if (cb) {
384 session_established_context.reset(cb);
385 }
386 _reopen_session();
387 }
388
389 entity_addr_t get_my_addr() const {
390 return my_addr;
391 }
392
393 const uuid_d& get_fsid() const {
394 return monmap.fsid;
395 }
396
397 entity_addr_t get_mon_addr(unsigned i) const {
398 Mutex::Locker l(monc_lock);
399 if (i < monmap.size())
400 return monmap.get_addr(i);
401 return entity_addr_t();
402 }
403 entity_inst_t get_mon_inst(unsigned i) const {
404 Mutex::Locker l(monc_lock);
405 if (i < monmap.size())
406 return monmap.get_inst(i);
407 return entity_inst_t();
408 }
409 int get_num_mon() const {
410 Mutex::Locker l(monc_lock);
411 return monmap.size();
412 }
413
414 uint64_t get_global_id() const {
415 Mutex::Locker l(monc_lock);
416 return global_id;
417 }
418
419 void set_messenger(Messenger *m) { messenger = m; }
420 entity_addr_t get_myaddr() const { return messenger->get_myaddr(); }
421 AuthAuthorizer* build_authorizer(int service_id) const;
422
423 void set_want_keys(uint32_t want) {
424 want_keys = want;
425 }
426
427 // admin commands
428 private:
429 uint64_t last_mon_command_tid;
430 struct MonCommand {
431 string target_name;
432 int target_rank;
433 uint64_t tid;
434 vector<string> cmd;
435 bufferlist inbl;
436 bufferlist *poutbl;
437 string *prs;
438 int *prval;
439 Context *onfinish, *ontimeout;
440
441 explicit MonCommand(uint64_t t)
442 : target_rank(-1),
443 tid(t),
444 poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL), ontimeout(NULL)
445 {}
446 };
447 map<uint64_t,MonCommand*> mon_commands;
448
449 void _send_command(MonCommand *r);
450 void _resend_mon_commands();
451 int _cancel_mon_command(uint64_t tid);
452 void _finish_command(MonCommand *r, int ret, string rs);
453 void _finish_auth();
454 void handle_mon_command_ack(MMonCommandAck *ack);
455
456 public:
457 void start_mon_command(const vector<string>& cmd, const bufferlist& inbl,
458 bufferlist *outbl, string *outs,
459 Context *onfinish);
460 void start_mon_command(int mon_rank,
461 const vector<string>& cmd, const bufferlist& inbl,
462 bufferlist *outbl, string *outs,
463 Context *onfinish);
464 void start_mon_command(const string &mon_name, ///< mon name, with mon. prefix
465 const vector<string>& cmd, const bufferlist& inbl,
466 bufferlist *outbl, string *outs,
467 Context *onfinish);
468
469 // version requests
470 public:
471 /**
472 * get latest known version(s) of cluster map
473 *
474 * @param map string name of map (e.g., 'osdmap')
475 * @param newest pointer where newest map version will be stored
476 * @param oldest pointer where oldest map version will be stored
477 * @param onfinish context that will be triggered on completion
478 * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
479 */
480 void get_version(string map, version_t *newest, version_t *oldest, Context *onfinish);
481
482 /**
483 * Run a callback within our lock, with a reference
484 * to the MonMap
485 */
486 template<typename Callback, typename...Args>
487 auto with_monmap(Callback&& cb, Args&&...args) const ->
488 decltype(cb(monmap, std::forward<Args>(args)...)) {
489 Mutex::Locker l(monc_lock);
490 return std::forward<Callback>(cb)(monmap, std::forward<Args>(args)...);
491 }
492
493 private:
494 struct version_req_d {
495 Context *context;
496 version_t *newest, *oldest;
497 version_req_d(Context *con, version_t *n, version_t *o) : context(con),newest(n), oldest(o) {}
498 };
499
500 map<ceph_tid_t, version_req_d*> version_requests;
501 ceph_tid_t version_req_id;
502 void handle_get_version_reply(MMonGetVersionReply* m);
503
504
505 };
506
507 #endif