]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/MonClient.h
bump version to 12.2.5-pve1
[ceph.git] / ceph / src / mon / MonClient.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_MONCLIENT_H
16#define CEPH_MONCLIENT_H
17
18#include <memory>
19
20#include "msg/Messenger.h"
21
22#include "MonMap.h"
23
24#include "common/Timer.h"
25#include "common/Finisher.h"
26#include "common/config.h"
27
28
29class MMonMap;
30class MMonGetVersionReply;
31struct MMonSubscribeAck;
32class MMonCommandAck;
33struct MAuthReply;
34class MAuthRotating;
35class LogClient;
36struct AuthAuthorizer;
37class AuthMethodList;
38class AuthClientHandler;
39class KeyRing;
40class RotatingKeyRing;
41
42struct MonClientPinger : public Dispatcher {
43
44 Mutex lock;
45 Cond ping_recvd_cond;
46 string *result;
47 bool done;
48
49 MonClientPinger(CephContext *cct_, string *res_) :
50 Dispatcher(cct_),
51 lock("MonClientPinger::lock"),
52 result(res_),
53 done(false)
54 { }
55
56 int wait_for_reply(double timeout = 0.0) {
57 utime_t until = ceph_clock_now();
58 until += (timeout > 0 ? timeout : cct->_conf->client_mount_timeout);
59 done = false;
60
61 int ret = 0;
62 while (!done) {
63 ret = ping_recvd_cond.WaitUntil(lock, until);
64 if (ret == ETIMEDOUT)
65 break;
66 }
67 return ret;
68 }
69
70 bool ms_dispatch(Message *m) override {
71 Mutex::Locker l(lock);
72 if (m->get_type() != CEPH_MSG_PING)
73 return false;
74
75 bufferlist &payload = m->get_payload();
76 if (result && payload.length() > 0) {
77 bufferlist::iterator p = payload.begin();
78 ::decode(*result, p);
79 }
80 done = true;
81 ping_recvd_cond.SignalAll();
82 m->put();
83 return true;
84 }
85 bool ms_handle_reset(Connection *con) override {
86 Mutex::Locker l(lock);
87 done = true;
88 ping_recvd_cond.SignalAll();
89 return true;
90 }
91 void ms_handle_remote_reset(Connection *con) override {}
92 bool ms_handle_refused(Connection *con) override {
93 return false;
94 }
95};
96
97class MonConnection {
98public:
99 MonConnection(CephContext *cct,
100 ConnectionRef conn,
101 uint64_t global_id);
102 ~MonConnection();
103 MonConnection(MonConnection&& rhs) = default;
104 MonConnection& operator=(MonConnection&&) = default;
105 MonConnection(const MonConnection& rhs) = delete;
106 MonConnection& operator=(const MonConnection&) = delete;
107 int handle_auth(MAuthReply *m,
108 const EntityName& entity_name,
109 uint32_t want_keys,
110 RotatingKeyRing* keyring);
111 int authenticate(MAuthReply *m);
112 void start(epoch_t epoch,
113 const EntityName& entity_name,
114 const AuthMethodList& auth_supported);
115 bool have_session() const;
116 uint64_t get_global_id() const {
117 return global_id;
118 }
119 ConnectionRef get_con() {
120 return con;
121 }
122 std::unique_ptr<AuthClientHandler>& get_auth() {
123 return auth;
124 }
125
126private:
127 int _negotiate(MAuthReply *m,
128 const EntityName& entity_name,
129 uint32_t want_keys,
130 RotatingKeyRing* keyring);
131
132private:
133 CephContext *cct;
134 enum class State {
135 NONE,
136 NEGOTIATING,
137 AUTHENTICATING,
138 HAVE_SESSION,
139 };
140 State state = State::NONE;
141 ConnectionRef con;
142
143 std::unique_ptr<AuthClientHandler> auth;
144 uint64_t global_id;
145};
146
147class MonClient : public Dispatcher {
148public:
149 MonMap monmap;
150private:
151 Messenger *messenger;
152
153 std::unique_ptr<MonConnection> active_con;
154 std::map<entity_addr_t, MonConnection> pending_cons;
155
156 EntityName entity_name;
157
158 entity_addr_t my_addr;
159
160 mutable Mutex monc_lock;
161 SafeTimer timer;
162 Finisher finisher;
163
164 bool initialized;
165 bool no_keyring_disabled_cephx;
166
167 LogClient *log_client;
168 bool more_log_pending;
169
170 void send_log(bool flush = false);
171
172 std::unique_ptr<AuthMethodList> auth_supported;
173
174 bool ms_dispatch(Message *m) override;
175 bool ms_handle_reset(Connection *con) override;
176 void ms_handle_remote_reset(Connection *con) override {}
177 bool ms_handle_refused(Connection *con) override { return false; }
178
179 void handle_monmap(MMonMap *m);
180
181 void handle_auth(MAuthReply *m);
182
183 // monitor session
184 void tick();
185 void schedule_tick();
186
187 // monclient
188 bool want_monmap;
189 Cond map_cond;
31f18b77
FG
190 bool passthrough_monmap = false;
191
7c673cae
FG
192 // authenticate
193 std::unique_ptr<AuthClientHandler> auth;
194 uint32_t want_keys = 0;
195 uint64_t global_id = 0;
196 Cond auth_cond;
197 int authenticate_err = 0;
198 bool authenticated = false;
199
200 list<Message*> waiting_for_session;
201 utime_t last_rotating_renew_sent;
202 std::unique_ptr<Context> session_established_context;
203 bool had_a_connection;
204 double reopen_interval_multiplier;
205
206 bool _opened() const;
207 bool _hunting() const;
208 void _start_hunting();
209 void _finish_hunting();
210 void _finish_auth(int auth_err);
211 void _reopen_session(int rank = -1);
212 MonConnection& _add_conn(unsigned rank, uint64_t global_id);
c07f9fc5 213 void _un_backoff();
7c673cae
FG
214 void _add_conns(uint64_t global_id);
215 void _send_mon_message(Message *m);
216
217public:
218 void set_entity_name(EntityName name) { entity_name = name; }
219
220 int _check_auth_tickets();
221 int _check_auth_rotating();
222 int wait_auth_rotating(double timeout);
223
224 int authenticate(double timeout=0.0);
225 bool is_authenticated() const {return authenticated;}
226
94b18763
FG
227 bool is_connected() const { return active_con != nullptr; }
228
7c673cae
FG
229 /**
230 * Try to flush as many log messages as we can in a single
231 * message. Use this before shutting down to transmit your
232 * last message.
233 */
234 void flush_log();
235
236 // mon subscriptions
237private:
238 map<string,ceph_mon_subscribe_item> sub_sent; // my subs, and current versions
239 map<string,ceph_mon_subscribe_item> sub_new; // unsent new subs
240 utime_t sub_renew_sent, sub_renew_after;
241
242 void _renew_subs();
243 void handle_subscribe_ack(MMonSubscribeAck* m);
244
245 bool _sub_want(const string &what, version_t start, unsigned flags) {
246 auto sub = sub_new.find(what);
247 if (sub != sub_new.end() &&
248 sub->second.start == start &&
249 sub->second.flags == flags) {
250 return false;
251 } else {
252 sub = sub_sent.find(what);
253 if (sub != sub_sent.end() &&
254 sub->second.start == start &&
255 sub->second.flags == flags)
256 return false;
257 }
258
259 sub_new[what].start = start;
260 sub_new[what].flags = flags;
261 return true;
262 }
263 void _sub_got(const string &what, version_t got) {
264 if (sub_new.count(what)) {
265 if (sub_new[what].start <= got) {
266 if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME)
267 sub_new.erase(what);
268 else
269 sub_new[what].start = got + 1;
270 }
271 } else if (sub_sent.count(what)) {
272 if (sub_sent[what].start <= got) {
273 if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME)
274 sub_sent.erase(what);
275 else
276 sub_sent[what].start = got + 1;
277 }
278 }
279 }
280 void _sub_unwant(const string &what) {
281 sub_sent.erase(what);
282 sub_new.erase(what);
283 }
284
285public:
286 void renew_subs() {
287 Mutex::Locker l(monc_lock);
288 _renew_subs();
289 }
290 bool sub_want(string what, version_t start, unsigned flags) {
291 Mutex::Locker l(monc_lock);
292 return _sub_want(what, start, flags);
293 }
294 void sub_got(string what, version_t have) {
295 Mutex::Locker l(monc_lock);
296 _sub_got(what, have);
297 }
298 void sub_unwant(string what) {
299 Mutex::Locker l(monc_lock);
300 _sub_unwant(what);
301 }
302 /**
303 * Increase the requested subscription start point. If you do increase
304 * the value, apply the passed-in flags as well; otherwise do nothing.
305 */
306 bool sub_want_increment(string what, version_t start, unsigned flags) {
307 Mutex::Locker l(monc_lock);
308 map<string,ceph_mon_subscribe_item>::iterator i = sub_new.find(what);
309 if (i != sub_new.end()) {
310 if (i->second.start >= start)
311 return false;
312 i->second.start = start;
313 i->second.flags = flags;
314 return true;
315 }
316
317 i = sub_sent.find(what);
318 if (i == sub_sent.end() || i->second.start < start) {
319 ceph_mon_subscribe_item& item = sub_new[what];
320 item.start = start;
321 item.flags = flags;
322 return true;
323 }
324 return false;
325 }
326
327 std::unique_ptr<KeyRing> keyring;
328 std::unique_ptr<RotatingKeyRing> rotating_secrets;
329
330 public:
331 explicit MonClient(CephContext *cct_);
332 MonClient(const MonClient &) = delete;
333 MonClient& operator=(const MonClient &) = delete;
334 ~MonClient() override;
335
336 int init();
337 void shutdown();
338
339 void set_log_client(LogClient *clog) {
340 log_client = clog;
341 }
342
343 int build_initial_monmap();
344 int get_monmap();
345 int get_monmap_privately();
31f18b77
FG
346 /**
347 * If you want to see MonMap messages, set this and
348 * the MonClient will tell the Messenger it hasn't
349 * dealt with it.
350 * Note that if you do this, *you* are of course responsible for
351 * putting the message reference!
352 */
353 void set_passthrough_monmap() {
354 Mutex::Locker l(monc_lock);
355 passthrough_monmap = true;
356 }
357 void unset_passthrough_monmap() {
358 Mutex::Locker l(monc_lock);
359 passthrough_monmap = false;
360 }
7c673cae
FG
361 /**
362 * Ping monitor with ID @p mon_id and record the resulting
363 * reply in @p result_reply.
364 *
365 * @param[in] mon_id Target monitor's ID
366 * @param[out] result_reply reply from mon.ID, if param != NULL
367 * @returns 0 in case of success; < 0 in case of error,
368 * -ETIMEDOUT if monitor didn't reply before timeout
369 * expired (default: conf->client_mount_timeout).
370 */
371 int ping_monitor(const string &mon_id, string *result_reply);
372
373 void send_mon_message(Message *m) {
374 Mutex::Locker l(monc_lock);
375 _send_mon_message(m);
376 }
377 /**
378 * If you specify a callback, you should not call
379 * reopen_session() again until it has been triggered. The MonClient
380 * will behave, but the first callback could be triggered after
381 * the session has been killed and the MonClient has started trying
382 * to reconnect to another monitor.
383 */
384 void reopen_session(Context *cb=NULL) {
385 Mutex::Locker l(monc_lock);
386 if (cb) {
387 session_established_context.reset(cb);
388 }
389 _reopen_session();
390 }
391
392 entity_addr_t get_my_addr() const {
393 return my_addr;
394 }
395
396 const uuid_d& get_fsid() const {
397 return monmap.fsid;
398 }
399
400 entity_addr_t get_mon_addr(unsigned i) const {
401 Mutex::Locker l(monc_lock);
402 if (i < monmap.size())
403 return monmap.get_addr(i);
404 return entity_addr_t();
405 }
406 entity_inst_t get_mon_inst(unsigned i) const {
407 Mutex::Locker l(monc_lock);
408 if (i < monmap.size())
409 return monmap.get_inst(i);
410 return entity_inst_t();
411 }
412 int get_num_mon() const {
413 Mutex::Locker l(monc_lock);
414 return monmap.size();
415 }
416
417 uint64_t get_global_id() const {
418 Mutex::Locker l(monc_lock);
419 return global_id;
420 }
421
422 void set_messenger(Messenger *m) { messenger = m; }
423 entity_addr_t get_myaddr() const { return messenger->get_myaddr(); }
424 AuthAuthorizer* build_authorizer(int service_id) const;
425
426 void set_want_keys(uint32_t want) {
427 want_keys = want;
428 }
429
430 // admin commands
431private:
432 uint64_t last_mon_command_tid;
433 struct MonCommand {
434 string target_name;
435 int target_rank;
436 uint64_t tid;
437 vector<string> cmd;
438 bufferlist inbl;
439 bufferlist *poutbl;
440 string *prs;
441 int *prval;
442 Context *onfinish, *ontimeout;
443
444 explicit MonCommand(uint64_t t)
445 : target_rank(-1),
446 tid(t),
447 poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL), ontimeout(NULL)
448 {}
449 };
450 map<uint64_t,MonCommand*> mon_commands;
451
452 void _send_command(MonCommand *r);
453 void _resend_mon_commands();
31f18b77 454 int _cancel_mon_command(uint64_t tid);
7c673cae
FG
455 void _finish_command(MonCommand *r, int ret, string rs);
456 void _finish_auth();
457 void handle_mon_command_ack(MMonCommandAck *ack);
458
459public:
460 void start_mon_command(const vector<string>& cmd, const bufferlist& inbl,
461 bufferlist *outbl, string *outs,
462 Context *onfinish);
463 void start_mon_command(int mon_rank,
464 const vector<string>& cmd, const bufferlist& inbl,
465 bufferlist *outbl, string *outs,
466 Context *onfinish);
467 void start_mon_command(const string &mon_name, ///< mon name, with mon. prefix
468 const vector<string>& cmd, const bufferlist& inbl,
469 bufferlist *outbl, string *outs,
470 Context *onfinish);
471
472 // version requests
473public:
474 /**
475 * get latest known version(s) of cluster map
476 *
477 * @param map string name of map (e.g., 'osdmap')
478 * @param newest pointer where newest map version will be stored
479 * @param oldest pointer where oldest map version will be stored
480 * @param onfinish context that will be triggered on completion
481 * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
482 */
483 void get_version(string map, version_t *newest, version_t *oldest, Context *onfinish);
484
485 /**
486 * Run a callback within our lock, with a reference
487 * to the MonMap
488 */
489 template<typename Callback, typename...Args>
490 auto with_monmap(Callback&& cb, Args&&...args) const ->
491 decltype(cb(monmap, std::forward<Args>(args)...)) {
492 Mutex::Locker l(monc_lock);
493 return std::forward<Callback>(cb)(monmap, std::forward<Args>(args)...);
494 }
495
496private:
497 struct version_req_d {
498 Context *context;
499 version_t *newest, *oldest;
500 version_req_d(Context *con, version_t *n, version_t *o) : context(con),newest(n), oldest(o) {}
501 };
502
503 map<ceph_tid_t, version_req_d*> version_requests;
504 ceph_tid_t version_req_id;
505 void handle_get_version_reply(MMonGetVersionReply* m);
506
507
508};
509
510#endif