]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonClient.h
0b24e01e2bace5fe884b8265804d6a64d1986a47
[ceph.git] / ceph / src / mon / MonClient.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_MONCLIENT_H
16 #define CEPH_MONCLIENT_H
17
18 #include <memory>
19
20 #include "msg/Messenger.h"
21
22 #include "MonMap.h"
23
24 #include "common/Timer.h"
25 #include "common/Finisher.h"
26 #include "common/config.h"
27
28
29 class MMonMap;
30 class MMonGetVersionReply;
31 struct MMonSubscribeAck;
32 class MMonCommandAck;
33 struct MAuthReply;
34 class MAuthRotating;
35 class LogClient;
36 struct AuthAuthorizer;
37 class AuthMethodList;
38 class AuthClientHandler;
39 class KeyRing;
40 class RotatingKeyRing;
41
42 struct MonClientPinger : public Dispatcher {
43
44 Mutex lock;
45 Cond ping_recvd_cond;
46 string *result;
47 bool done;
48
49 MonClientPinger(CephContext *cct_, string *res_) :
50 Dispatcher(cct_),
51 lock("MonClientPinger::lock"),
52 result(res_),
53 done(false)
54 { }
55
56 int wait_for_reply(double timeout = 0.0) {
57 utime_t until = ceph_clock_now();
58 until += (timeout > 0 ? timeout : cct->_conf->client_mount_timeout);
59 done = false;
60
61 int ret = 0;
62 while (!done) {
63 ret = ping_recvd_cond.WaitUntil(lock, until);
64 if (ret == ETIMEDOUT)
65 break;
66 }
67 return ret;
68 }
69
70 bool ms_dispatch(Message *m) override {
71 Mutex::Locker l(lock);
72 if (m->get_type() != CEPH_MSG_PING)
73 return false;
74
75 bufferlist &payload = m->get_payload();
76 if (result && payload.length() > 0) {
77 bufferlist::iterator p = payload.begin();
78 ::decode(*result, p);
79 }
80 done = true;
81 ping_recvd_cond.SignalAll();
82 m->put();
83 return true;
84 }
85 bool ms_handle_reset(Connection *con) override {
86 Mutex::Locker l(lock);
87 done = true;
88 ping_recvd_cond.SignalAll();
89 return true;
90 }
91 void ms_handle_remote_reset(Connection *con) override {}
92 bool ms_handle_refused(Connection *con) override {
93 return false;
94 }
95 };
96
97 class MonConnection {
98 public:
99 MonConnection(CephContext *cct,
100 ConnectionRef conn,
101 uint64_t global_id);
102 ~MonConnection();
103 MonConnection(MonConnection&& rhs) = default;
104 MonConnection& operator=(MonConnection&&) = default;
105 MonConnection(const MonConnection& rhs) = delete;
106 MonConnection& operator=(const MonConnection&) = delete;
107 int handle_auth(MAuthReply *m,
108 const EntityName& entity_name,
109 uint32_t want_keys,
110 RotatingKeyRing* keyring);
111 int authenticate(MAuthReply *m);
112 void start(epoch_t epoch,
113 const EntityName& entity_name,
114 const AuthMethodList& auth_supported);
115 bool have_session() const;
116 uint64_t get_global_id() const {
117 return global_id;
118 }
119 ConnectionRef get_con() {
120 return con;
121 }
122 std::unique_ptr<AuthClientHandler>& get_auth() {
123 return auth;
124 }
125
126 private:
127 int _negotiate(MAuthReply *m,
128 const EntityName& entity_name,
129 uint32_t want_keys,
130 RotatingKeyRing* keyring);
131
132 private:
133 CephContext *cct;
134 enum class State {
135 NONE,
136 NEGOTIATING,
137 AUTHENTICATING,
138 HAVE_SESSION,
139 };
140 State state = State::NONE;
141 ConnectionRef con;
142
143 std::unique_ptr<AuthClientHandler> auth;
144 uint64_t global_id;
145 };
146
147 class MonClient : public Dispatcher {
148 public:
149 MonMap monmap;
150 private:
151 Messenger *messenger;
152
153 std::unique_ptr<MonConnection> active_con;
154 std::map<entity_addr_t, MonConnection> pending_cons;
155
156 EntityName entity_name;
157
158 entity_addr_t my_addr;
159
160 mutable Mutex monc_lock;
161 SafeTimer timer;
162 Finisher finisher;
163
164 bool initialized;
165 bool no_keyring_disabled_cephx;
166
167 LogClient *log_client;
168 bool more_log_pending;
169
170 void send_log(bool flush = false);
171
172 std::unique_ptr<AuthMethodList> auth_supported;
173
174 bool ms_dispatch(Message *m) override;
175 bool ms_handle_reset(Connection *con) override;
176 void ms_handle_remote_reset(Connection *con) override {}
177 bool ms_handle_refused(Connection *con) override { return false; }
178
179 void handle_monmap(MMonMap *m);
180
181 void handle_auth(MAuthReply *m);
182
183 // monitor session
184 void tick();
185 void schedule_tick();
186
187 // monclient
188 bool want_monmap;
189 Cond map_cond;
190 bool passthrough_monmap = false;
191
192 // authenticate
193 std::unique_ptr<AuthClientHandler> auth;
194 uint32_t want_keys = 0;
195 uint64_t global_id = 0;
196 Cond auth_cond;
197 int authenticate_err = 0;
198 bool authenticated = false;
199
200 list<Message*> waiting_for_session;
201 utime_t last_rotating_renew_sent;
202 std::unique_ptr<Context> session_established_context;
203 bool had_a_connection;
204 double reopen_interval_multiplier;
205
206 bool _opened() const;
207 bool _hunting() const;
208 void _start_hunting();
209 void _finish_hunting();
210 void _finish_auth(int auth_err);
211 void _reopen_session(int rank = -1);
212 MonConnection& _add_conn(unsigned rank, uint64_t global_id);
213 void _un_backoff();
214 void _add_conns(uint64_t global_id);
215 void _send_mon_message(Message *m);
216
217 public:
218 void set_entity_name(EntityName name) { entity_name = name; }
219
220 int _check_auth_tickets();
221 int _check_auth_rotating();
222 int wait_auth_rotating(double timeout);
223
224 int authenticate(double timeout=0.0);
225 bool is_authenticated() const {return authenticated;}
226
227 /**
228 * Try to flush as many log messages as we can in a single
229 * message. Use this before shutting down to transmit your
230 * last message.
231 */
232 void flush_log();
233
234 // mon subscriptions
235 private:
236 map<string,ceph_mon_subscribe_item> sub_sent; // my subs, and current versions
237 map<string,ceph_mon_subscribe_item> sub_new; // unsent new subs
238 utime_t sub_renew_sent, sub_renew_after;
239
240 void _renew_subs();
241 void handle_subscribe_ack(MMonSubscribeAck* m);
242
243 bool _sub_want(const string &what, version_t start, unsigned flags) {
244 auto sub = sub_new.find(what);
245 if (sub != sub_new.end() &&
246 sub->second.start == start &&
247 sub->second.flags == flags) {
248 return false;
249 } else {
250 sub = sub_sent.find(what);
251 if (sub != sub_sent.end() &&
252 sub->second.start == start &&
253 sub->second.flags == flags)
254 return false;
255 }
256
257 sub_new[what].start = start;
258 sub_new[what].flags = flags;
259 return true;
260 }
261 void _sub_got(const string &what, version_t got) {
262 if (sub_new.count(what)) {
263 if (sub_new[what].start <= got) {
264 if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME)
265 sub_new.erase(what);
266 else
267 sub_new[what].start = got + 1;
268 }
269 } else if (sub_sent.count(what)) {
270 if (sub_sent[what].start <= got) {
271 if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME)
272 sub_sent.erase(what);
273 else
274 sub_sent[what].start = got + 1;
275 }
276 }
277 }
278 void _sub_unwant(const string &what) {
279 sub_sent.erase(what);
280 sub_new.erase(what);
281 }
282
283 public:
284 void renew_subs() {
285 Mutex::Locker l(monc_lock);
286 _renew_subs();
287 }
288 bool sub_want(string what, version_t start, unsigned flags) {
289 Mutex::Locker l(monc_lock);
290 return _sub_want(what, start, flags);
291 }
292 void sub_got(string what, version_t have) {
293 Mutex::Locker l(monc_lock);
294 _sub_got(what, have);
295 }
296 void sub_unwant(string what) {
297 Mutex::Locker l(monc_lock);
298 _sub_unwant(what);
299 }
300 /**
301 * Increase the requested subscription start point. If you do increase
302 * the value, apply the passed-in flags as well; otherwise do nothing.
303 */
304 bool sub_want_increment(string what, version_t start, unsigned flags) {
305 Mutex::Locker l(monc_lock);
306 map<string,ceph_mon_subscribe_item>::iterator i = sub_new.find(what);
307 if (i != sub_new.end()) {
308 if (i->second.start >= start)
309 return false;
310 i->second.start = start;
311 i->second.flags = flags;
312 return true;
313 }
314
315 i = sub_sent.find(what);
316 if (i == sub_sent.end() || i->second.start < start) {
317 ceph_mon_subscribe_item& item = sub_new[what];
318 item.start = start;
319 item.flags = flags;
320 return true;
321 }
322 return false;
323 }
324
325 std::unique_ptr<KeyRing> keyring;
326 std::unique_ptr<RotatingKeyRing> rotating_secrets;
327
328 public:
329 explicit MonClient(CephContext *cct_);
330 MonClient(const MonClient &) = delete;
331 MonClient& operator=(const MonClient &) = delete;
332 ~MonClient() override;
333
334 int init();
335 void shutdown();
336
337 void set_log_client(LogClient *clog) {
338 log_client = clog;
339 }
340
341 int build_initial_monmap();
342 int get_monmap();
343 int get_monmap_privately();
344 /**
345 * If you want to see MonMap messages, set this and
346 * the MonClient will tell the Messenger it hasn't
347 * dealt with it.
348 * Note that if you do this, *you* are of course responsible for
349 * putting the message reference!
350 */
351 void set_passthrough_monmap() {
352 Mutex::Locker l(monc_lock);
353 passthrough_monmap = true;
354 }
355 void unset_passthrough_monmap() {
356 Mutex::Locker l(monc_lock);
357 passthrough_monmap = false;
358 }
359 /**
360 * Ping monitor with ID @p mon_id and record the resulting
361 * reply in @p result_reply.
362 *
363 * @param[in] mon_id Target monitor's ID
364 * @param[out] result_reply reply from mon.ID, if param != NULL
365 * @returns 0 in case of success; < 0 in case of error,
366 * -ETIMEDOUT if monitor didn't reply before timeout
367 * expired (default: conf->client_mount_timeout).
368 */
369 int ping_monitor(const string &mon_id, string *result_reply);
370
371 void send_mon_message(Message *m) {
372 Mutex::Locker l(monc_lock);
373 _send_mon_message(m);
374 }
375 /**
376 * If you specify a callback, you should not call
377 * reopen_session() again until it has been triggered. The MonClient
378 * will behave, but the first callback could be triggered after
379 * the session has been killed and the MonClient has started trying
380 * to reconnect to another monitor.
381 */
382 void reopen_session(Context *cb=NULL) {
383 Mutex::Locker l(monc_lock);
384 if (cb) {
385 session_established_context.reset(cb);
386 }
387 _reopen_session();
388 }
389
390 entity_addr_t get_my_addr() const {
391 return my_addr;
392 }
393
394 const uuid_d& get_fsid() const {
395 return monmap.fsid;
396 }
397
398 entity_addr_t get_mon_addr(unsigned i) const {
399 Mutex::Locker l(monc_lock);
400 if (i < monmap.size())
401 return monmap.get_addr(i);
402 return entity_addr_t();
403 }
404 entity_inst_t get_mon_inst(unsigned i) const {
405 Mutex::Locker l(monc_lock);
406 if (i < monmap.size())
407 return monmap.get_inst(i);
408 return entity_inst_t();
409 }
410 int get_num_mon() const {
411 Mutex::Locker l(monc_lock);
412 return monmap.size();
413 }
414
415 uint64_t get_global_id() const {
416 Mutex::Locker l(monc_lock);
417 return global_id;
418 }
419
420 void set_messenger(Messenger *m) { messenger = m; }
421 entity_addr_t get_myaddr() const { return messenger->get_myaddr(); }
422 AuthAuthorizer* build_authorizer(int service_id) const;
423
424 void set_want_keys(uint32_t want) {
425 want_keys = want;
426 }
427
428 // admin commands
429 private:
430 uint64_t last_mon_command_tid;
431 struct MonCommand {
432 string target_name;
433 int target_rank;
434 uint64_t tid;
435 vector<string> cmd;
436 bufferlist inbl;
437 bufferlist *poutbl;
438 string *prs;
439 int *prval;
440 Context *onfinish, *ontimeout;
441
442 explicit MonCommand(uint64_t t)
443 : target_rank(-1),
444 tid(t),
445 poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL), ontimeout(NULL)
446 {}
447 };
448 map<uint64_t,MonCommand*> mon_commands;
449
450 void _send_command(MonCommand *r);
451 void _resend_mon_commands();
452 int _cancel_mon_command(uint64_t tid);
453 void _finish_command(MonCommand *r, int ret, string rs);
454 void _finish_auth();
455 void handle_mon_command_ack(MMonCommandAck *ack);
456
457 public:
458 void start_mon_command(const vector<string>& cmd, const bufferlist& inbl,
459 bufferlist *outbl, string *outs,
460 Context *onfinish);
461 void start_mon_command(int mon_rank,
462 const vector<string>& cmd, const bufferlist& inbl,
463 bufferlist *outbl, string *outs,
464 Context *onfinish);
465 void start_mon_command(const string &mon_name, ///< mon name, with mon. prefix
466 const vector<string>& cmd, const bufferlist& inbl,
467 bufferlist *outbl, string *outs,
468 Context *onfinish);
469
470 // version requests
471 public:
472 /**
473 * get latest known version(s) of cluster map
474 *
475 * @param map string name of map (e.g., 'osdmap')
476 * @param newest pointer where newest map version will be stored
477 * @param oldest pointer where oldest map version will be stored
478 * @param onfinish context that will be triggered on completion
479 * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
480 */
481 void get_version(string map, version_t *newest, version_t *oldest, Context *onfinish);
482
483 /**
484 * Run a callback within our lock, with a reference
485 * to the MonMap
486 */
487 template<typename Callback, typename...Args>
488 auto with_monmap(Callback&& cb, Args&&...args) const ->
489 decltype(cb(monmap, std::forward<Args>(args)...)) {
490 Mutex::Locker l(monc_lock);
491 return std::forward<Callback>(cb)(monmap, std::forward<Args>(args)...);
492 }
493
494 private:
495 struct version_req_d {
496 Context *context;
497 version_t *newest, *oldest;
498 version_req_d(Context *con, version_t *n, version_t *o) : context(con),newest(n), oldest(o) {}
499 };
500
501 map<ceph_tid_t, version_req_d*> version_requests;
502 ceph_tid_t version_req_id;
503 void handle_get_version_reply(MMonGetVersionReply* m);
504
505
506 };
507
508 #endif