]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonClient.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / mon / MonClient.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_MONCLIENT_H
16 #define CEPH_MONCLIENT_H
17
18 #include <memory>
19
20 #include "msg/Messenger.h"
21
22 #include "MonMap.h"
23
24 #include "common/Timer.h"
25 #include "common/Finisher.h"
26 #include "common/config.h"
27
28
29 class MMonMap;
30 class MMonGetVersionReply;
31 struct MMonSubscribeAck;
32 class MMonCommandAck;
33 struct MAuthReply;
34 class MAuthRotating;
35 class LogClient;
36 struct AuthAuthorizer;
37 class AuthMethodList;
38 class AuthClientHandler;
39 class KeyRing;
40 class RotatingKeyRing;
41
42 struct MonClientPinger : public Dispatcher {
43
44 Mutex lock;
45 Cond ping_recvd_cond;
46 string *result;
47 bool done;
48
49 MonClientPinger(CephContext *cct_, string *res_) :
50 Dispatcher(cct_),
51 lock("MonClientPinger::lock"),
52 result(res_),
53 done(false)
54 { }
55
56 int wait_for_reply(double timeout = 0.0) {
57 utime_t until = ceph_clock_now();
58 until += (timeout > 0 ? timeout : cct->_conf->client_mount_timeout);
59 done = false;
60
61 int ret = 0;
62 while (!done) {
63 ret = ping_recvd_cond.WaitUntil(lock, until);
64 if (ret == ETIMEDOUT)
65 break;
66 }
67 return ret;
68 }
69
70 bool ms_dispatch(Message *m) override {
71 Mutex::Locker l(lock);
72 if (m->get_type() != CEPH_MSG_PING)
73 return false;
74
75 bufferlist &payload = m->get_payload();
76 if (result && payload.length() > 0) {
77 bufferlist::iterator p = payload.begin();
78 ::decode(*result, p);
79 }
80 done = true;
81 ping_recvd_cond.SignalAll();
82 m->put();
83 return true;
84 }
85 bool ms_handle_reset(Connection *con) override {
86 Mutex::Locker l(lock);
87 done = true;
88 ping_recvd_cond.SignalAll();
89 return true;
90 }
91 void ms_handle_remote_reset(Connection *con) override {}
92 bool ms_handle_refused(Connection *con) override {
93 return false;
94 }
95 };
96
97 class MonConnection {
98 public:
99 MonConnection(CephContext *cct,
100 ConnectionRef conn,
101 uint64_t global_id);
102 ~MonConnection();
103 MonConnection(MonConnection&& rhs) = default;
104 MonConnection& operator=(MonConnection&&) = default;
105 MonConnection(const MonConnection& rhs) = delete;
106 MonConnection& operator=(const MonConnection&) = delete;
107 int handle_auth(MAuthReply *m,
108 const EntityName& entity_name,
109 uint32_t want_keys,
110 RotatingKeyRing* keyring);
111 int authenticate(MAuthReply *m);
112 void start(epoch_t epoch,
113 const EntityName& entity_name,
114 const AuthMethodList& auth_supported);
115 bool have_session() const;
116 uint64_t get_global_id() const {
117 return global_id;
118 }
119 ConnectionRef get_con() {
120 return con;
121 }
122 std::unique_ptr<AuthClientHandler>& get_auth() {
123 return auth;
124 }
125
126 private:
127 int _negotiate(MAuthReply *m,
128 const EntityName& entity_name,
129 uint32_t want_keys,
130 RotatingKeyRing* keyring);
131
132 private:
133 CephContext *cct;
134 enum class State {
135 NONE,
136 NEGOTIATING,
137 AUTHENTICATING,
138 HAVE_SESSION,
139 };
140 State state = State::NONE;
141 ConnectionRef con;
142
143 std::unique_ptr<AuthClientHandler> auth;
144 uint64_t global_id;
145 };
146
147 class MonClient : public Dispatcher {
148 public:
149 MonMap monmap;
150 private:
151 Messenger *messenger;
152
153 std::unique_ptr<MonConnection> active_con;
154 std::map<entity_addr_t, MonConnection> pending_cons;
155
156 EntityName entity_name;
157
158 entity_addr_t my_addr;
159
160 mutable Mutex monc_lock;
161 SafeTimer timer;
162 Finisher finisher;
163
164 bool initialized;
165 bool no_keyring_disabled_cephx;
166
167 LogClient *log_client;
168 bool more_log_pending;
169
170 void send_log(bool flush = false);
171
172 std::unique_ptr<AuthMethodList> auth_supported;
173
174 bool ms_dispatch(Message *m) override;
175 bool ms_handle_reset(Connection *con) override;
176 void ms_handle_remote_reset(Connection *con) override {}
177 bool ms_handle_refused(Connection *con) override { return false; }
178
179 void handle_monmap(MMonMap *m);
180
181 void handle_auth(MAuthReply *m);
182
183 // monitor session
184 void tick();
185 void schedule_tick();
186
187 // monclient
188 bool want_monmap;
189 Cond map_cond;
190 private:
191 // authenticate
192 std::unique_ptr<AuthClientHandler> auth;
193 uint32_t want_keys = 0;
194 uint64_t global_id = 0;
195 Cond auth_cond;
196 int authenticate_err = 0;
197 bool authenticated = false;
198
199 list<Message*> waiting_for_session;
200 utime_t last_rotating_renew_sent;
201 std::unique_ptr<Context> session_established_context;
202 bool had_a_connection;
203 double reopen_interval_multiplier;
204
205 bool _opened() const;
206 bool _hunting() const;
207 void _start_hunting();
208 void _finish_hunting();
209 void _finish_auth(int auth_err);
210 void _reopen_session(int rank = -1);
211 MonConnection& _add_conn(unsigned rank, uint64_t global_id);
212 void _add_conns(uint64_t global_id);
213 void _send_mon_message(Message *m);
214
215 public:
216 void set_entity_name(EntityName name) { entity_name = name; }
217
218 int _check_auth_tickets();
219 int _check_auth_rotating();
220 int wait_auth_rotating(double timeout);
221
222 int authenticate(double timeout=0.0);
223 bool is_authenticated() const {return authenticated;}
224
225 /**
226 * Try to flush as many log messages as we can in a single
227 * message. Use this before shutting down to transmit your
228 * last message.
229 */
230 void flush_log();
231
232 // mon subscriptions
233 private:
234 map<string,ceph_mon_subscribe_item> sub_sent; // my subs, and current versions
235 map<string,ceph_mon_subscribe_item> sub_new; // unsent new subs
236 utime_t sub_renew_sent, sub_renew_after;
237
238 void _renew_subs();
239 void handle_subscribe_ack(MMonSubscribeAck* m);
240
241 bool _sub_want(const string &what, version_t start, unsigned flags) {
242 auto sub = sub_new.find(what);
243 if (sub != sub_new.end() &&
244 sub->second.start == start &&
245 sub->second.flags == flags) {
246 return false;
247 } else {
248 sub = sub_sent.find(what);
249 if (sub != sub_sent.end() &&
250 sub->second.start == start &&
251 sub->second.flags == flags)
252 return false;
253 }
254
255 sub_new[what].start = start;
256 sub_new[what].flags = flags;
257 return true;
258 }
259 void _sub_got(const string &what, version_t got) {
260 if (sub_new.count(what)) {
261 if (sub_new[what].start <= got) {
262 if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME)
263 sub_new.erase(what);
264 else
265 sub_new[what].start = got + 1;
266 }
267 } else if (sub_sent.count(what)) {
268 if (sub_sent[what].start <= got) {
269 if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME)
270 sub_sent.erase(what);
271 else
272 sub_sent[what].start = got + 1;
273 }
274 }
275 }
276 void _sub_unwant(const string &what) {
277 sub_sent.erase(what);
278 sub_new.erase(what);
279 }
280
281 public:
282 void renew_subs() {
283 Mutex::Locker l(monc_lock);
284 _renew_subs();
285 }
286 bool sub_want(string what, version_t start, unsigned flags) {
287 Mutex::Locker l(monc_lock);
288 return _sub_want(what, start, flags);
289 }
290 void sub_got(string what, version_t have) {
291 Mutex::Locker l(monc_lock);
292 _sub_got(what, have);
293 }
294 void sub_unwant(string what) {
295 Mutex::Locker l(monc_lock);
296 _sub_unwant(what);
297 }
298 /**
299 * Increase the requested subscription start point. If you do increase
300 * the value, apply the passed-in flags as well; otherwise do nothing.
301 */
302 bool sub_want_increment(string what, version_t start, unsigned flags) {
303 Mutex::Locker l(monc_lock);
304 map<string,ceph_mon_subscribe_item>::iterator i = sub_new.find(what);
305 if (i != sub_new.end()) {
306 if (i->second.start >= start)
307 return false;
308 i->second.start = start;
309 i->second.flags = flags;
310 return true;
311 }
312
313 i = sub_sent.find(what);
314 if (i == sub_sent.end() || i->second.start < start) {
315 ceph_mon_subscribe_item& item = sub_new[what];
316 item.start = start;
317 item.flags = flags;
318 return true;
319 }
320 return false;
321 }
322
323 std::unique_ptr<KeyRing> keyring;
324 std::unique_ptr<RotatingKeyRing> rotating_secrets;
325
326 public:
327 explicit MonClient(CephContext *cct_);
328 MonClient(const MonClient &) = delete;
329 MonClient& operator=(const MonClient &) = delete;
330 ~MonClient() override;
331
332 int init();
333 void shutdown();
334
335 void set_log_client(LogClient *clog) {
336 log_client = clog;
337 }
338
339 int build_initial_monmap();
340 int get_monmap();
341 int get_monmap_privately();
342 /**
343 * Ping monitor with ID @p mon_id and record the resulting
344 * reply in @p result_reply.
345 *
346 * @param[in] mon_id Target monitor's ID
347 * @param[out] result_reply reply from mon.ID, if param != NULL
348 * @returns 0 in case of success; < 0 in case of error,
349 * -ETIMEDOUT if monitor didn't reply before timeout
350 * expired (default: conf->client_mount_timeout).
351 */
352 int ping_monitor(const string &mon_id, string *result_reply);
353
354 void send_mon_message(Message *m) {
355 Mutex::Locker l(monc_lock);
356 _send_mon_message(m);
357 }
358 /**
359 * If you specify a callback, you should not call
360 * reopen_session() again until it has been triggered. The MonClient
361 * will behave, but the first callback could be triggered after
362 * the session has been killed and the MonClient has started trying
363 * to reconnect to another monitor.
364 */
365 void reopen_session(Context *cb=NULL) {
366 Mutex::Locker l(monc_lock);
367 if (cb) {
368 session_established_context.reset(cb);
369 }
370 _reopen_session();
371 }
372
373 entity_addr_t get_my_addr() const {
374 return my_addr;
375 }
376
377 const uuid_d& get_fsid() const {
378 return monmap.fsid;
379 }
380
381 entity_addr_t get_mon_addr(unsigned i) const {
382 Mutex::Locker l(monc_lock);
383 if (i < monmap.size())
384 return monmap.get_addr(i);
385 return entity_addr_t();
386 }
387 entity_inst_t get_mon_inst(unsigned i) const {
388 Mutex::Locker l(monc_lock);
389 if (i < monmap.size())
390 return monmap.get_inst(i);
391 return entity_inst_t();
392 }
393 int get_num_mon() const {
394 Mutex::Locker l(monc_lock);
395 return monmap.size();
396 }
397
398 uint64_t get_global_id() const {
399 Mutex::Locker l(monc_lock);
400 return global_id;
401 }
402
403 void set_messenger(Messenger *m) { messenger = m; }
404 entity_addr_t get_myaddr() const { return messenger->get_myaddr(); }
405 AuthAuthorizer* build_authorizer(int service_id) const;
406
407 void set_want_keys(uint32_t want) {
408 want_keys = want;
409 }
410
411 // admin commands
412 private:
413 uint64_t last_mon_command_tid;
414 struct MonCommand {
415 string target_name;
416 int target_rank;
417 uint64_t tid;
418 vector<string> cmd;
419 bufferlist inbl;
420 bufferlist *poutbl;
421 string *prs;
422 int *prval;
423 Context *onfinish, *ontimeout;
424
425 explicit MonCommand(uint64_t t)
426 : target_rank(-1),
427 tid(t),
428 poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL), ontimeout(NULL)
429 {}
430 };
431 map<uint64_t,MonCommand*> mon_commands;
432
433 void _send_command(MonCommand *r);
434 void _resend_mon_commands();
435 int _cancel_mon_command(uint64_t tid, int r);
436 void _finish_command(MonCommand *r, int ret, string rs);
437 void _finish_auth();
438 void handle_mon_command_ack(MMonCommandAck *ack);
439
440 public:
441 void start_mon_command(const vector<string>& cmd, const bufferlist& inbl,
442 bufferlist *outbl, string *outs,
443 Context *onfinish);
444 void start_mon_command(int mon_rank,
445 const vector<string>& cmd, const bufferlist& inbl,
446 bufferlist *outbl, string *outs,
447 Context *onfinish);
448 void start_mon_command(const string &mon_name, ///< mon name, with mon. prefix
449 const vector<string>& cmd, const bufferlist& inbl,
450 bufferlist *outbl, string *outs,
451 Context *onfinish);
452
453 // version requests
454 public:
455 /**
456 * get latest known version(s) of cluster map
457 *
458 * @param map string name of map (e.g., 'osdmap')
459 * @param newest pointer where newest map version will be stored
460 * @param oldest pointer where oldest map version will be stored
461 * @param onfinish context that will be triggered on completion
462 * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
463 */
464 void get_version(string map, version_t *newest, version_t *oldest, Context *onfinish);
465
466 /**
467 * Run a callback within our lock, with a reference
468 * to the MonMap
469 */
470 template<typename Callback, typename...Args>
471 auto with_monmap(Callback&& cb, Args&&...args) const ->
472 decltype(cb(monmap, std::forward<Args>(args)...)) {
473 Mutex::Locker l(monc_lock);
474 return std::forward<Callback>(cb)(monmap, std::forward<Args>(args)...);
475 }
476
477 private:
478 struct version_req_d {
479 Context *context;
480 version_t *newest, *oldest;
481 version_req_d(Context *con, version_t *n, version_t *o) : context(con),newest(n), oldest(o) {}
482 };
483
484 map<ceph_tid_t, version_req_d*> version_requests;
485 ceph_tid_t version_req_id;
486 void handle_get_version_reply(MMonGetVersionReply* m);
487
488
489 };
490
491 #endif