]> git.proxmox.com Git - ceph.git/blob - ceph/src/librados/RadosClient.cc
Import ceph 15.2.8
[ceph.git] / ceph / src / librados / RadosClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <fcntl.h>
18
19 #include <iostream>
20 #include <string>
21 #include <sstream>
22 #include <pthread.h>
23 #include <errno.h>
24
25 #include "common/ceph_context.h"
26 #include "common/config.h"
27 #include "common/common_init.h"
28 #include "common/ceph_json.h"
29 #include "common/errno.h"
30 #include "common/ceph_json.h"
31 #include "include/buffer.h"
32 #include "include/stringify.h"
33 #include "include/util.h"
34
35 #include "messages/MLog.h"
36 #include "msg/Messenger.h"
37
38 // needed for static_cast
39 #include "messages/PaxosServiceMessage.h"
40 #include "messages/MPoolOpReply.h"
41 #include "messages/MStatfsReply.h"
42 #include "messages/MGetPoolStatsReply.h"
43 #include "messages/MOSDOpReply.h"
44 #include "messages/MOSDMap.h"
45 #include "messages/MCommandReply.h"
46
47 #include "AioCompletionImpl.h"
48 #include "IoCtxImpl.h"
49 #include "PoolAsyncCompletionImpl.h"
50 #include "RadosClient.h"
51
52 #include "include/ceph_assert.h"
53 #include "common/EventTrace.h"
54
55 #define dout_subsys ceph_subsys_rados
56 #undef dout_prefix
57 #define dout_prefix *_dout << "librados: "
58
59 librados::RadosClient::RadosClient(CephContext *cct_)
60 : Dispatcher(cct_->get()),
61 cct_deleter{cct_, [](CephContext *p) {p->put();}},
62 conf(cct_->_conf),
63 state(DISCONNECTED),
64 monclient(cct_),
65 mgrclient(cct_, nullptr, &monclient.monmap),
66 messenger(NULL),
67 instance_id(0),
68 objecter(NULL),
69 timer(cct, lock),
70 refcnt(1),
71 log_last_version(0), log_cb(NULL), log_cb2(NULL), log_cb_arg(NULL),
72 finisher(cct, "radosclient", "fn-radosclient")
73 {
74 conf.add_observer(this);
75 }
76
77 int64_t librados::RadosClient::lookup_pool(const char *name)
78 {
79 int r = wait_for_osdmap();
80 if (r < 0) {
81 return r;
82 }
83
84 int64_t ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
85 name);
86 if (-ENOENT == ret) {
87 // Make sure we have the latest map
88 int r = wait_for_latest_osdmap();
89 if (r < 0)
90 return r;
91 ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
92 name);
93 }
94
95 return ret;
96 }
97
98 bool librados::RadosClient::pool_requires_alignment(int64_t pool_id)
99 {
100 bool requires;
101 int r = pool_requires_alignment2(pool_id, &requires);
102 if (r < 0) {
103 // Cast answer to false, this is a little bit problematic
104 // since we really don't know the answer yet, say.
105 return false;
106 }
107
108 return requires;
109 }
110
111 // a safer version of pool_requires_alignment
112 int librados::RadosClient::pool_requires_alignment2(int64_t pool_id,
113 bool *requires)
114 {
115 if (!requires)
116 return -EINVAL;
117
118 int r = wait_for_osdmap();
119 if (r < 0) {
120 return r;
121 }
122
123 return objecter->with_osdmap([requires, pool_id](const OSDMap& o) {
124 if (!o.have_pg_pool(pool_id)) {
125 return -ENOENT;
126 }
127 *requires = o.get_pg_pool(pool_id)->requires_aligned_append();
128 return 0;
129 });
130 }
131
132 uint64_t librados::RadosClient::pool_required_alignment(int64_t pool_id)
133 {
134 uint64_t alignment;
135 int r = pool_required_alignment2(pool_id, &alignment);
136 if (r < 0) {
137 return 0;
138 }
139
140 return alignment;
141 }
142
143 // a safer version of pool_required_alignment
144 int librados::RadosClient::pool_required_alignment2(int64_t pool_id,
145 uint64_t *alignment)
146 {
147 if (!alignment)
148 return -EINVAL;
149
150 int r = wait_for_osdmap();
151 if (r < 0) {
152 return r;
153 }
154
155 return objecter->with_osdmap([alignment, pool_id](const OSDMap &o) {
156 if (!o.have_pg_pool(pool_id)) {
157 return -ENOENT;
158 }
159 *alignment = o.get_pg_pool(pool_id)->required_alignment();
160 return 0;
161 });
162 }
163
164 int librados::RadosClient::pool_get_name(uint64_t pool_id, std::string *s, bool wait_latest_map)
165 {
166 int r = wait_for_osdmap();
167 if (r < 0)
168 return r;
169 retry:
170 objecter->with_osdmap([&](const OSDMap& o) {
171 if (!o.have_pg_pool(pool_id)) {
172 r = -ENOENT;
173 } else {
174 r = 0;
175 *s = o.get_pool_name(pool_id);
176 }
177 });
178 if (r == -ENOENT && wait_latest_map) {
179 r = wait_for_latest_osdmap();
180 if (r < 0)
181 return r;
182 wait_latest_map = false;
183 goto retry;
184 }
185
186 return r;
187 }
188
189 int librados::RadosClient::get_fsid(std::string *s)
190 {
191 if (!s)
192 return -EINVAL;
193 std::lock_guard l(lock);
194 ostringstream oss;
195 oss << monclient.get_fsid();
196 *s = oss.str();
197 return 0;
198 }
199
200 int librados::RadosClient::ping_monitor(const string mon_id, string *result)
201 {
202 int err = 0;
203 /* If we haven't yet connected, we have no way of telling whether we
204 * already built monc's initial monmap. IF we are in CONNECTED state,
205 * then it is safe to assume that we went through connect(), which does
206 * build a monmap.
207 */
208 if (state != CONNECTED) {
209 ldout(cct, 10) << __func__ << " build monmap" << dendl;
210 err = monclient.build_initial_monmap();
211 }
212 if (err < 0) {
213 return err;
214 }
215
216 err = monclient.ping_monitor(mon_id, result);
217 return err;
218 }
219
220 int librados::RadosClient::connect()
221 {
222 int err;
223
224 // already connected?
225 if (state == CONNECTING)
226 return -EINPROGRESS;
227 if (state == CONNECTED)
228 return -EISCONN;
229 state = CONNECTING;
230
231 if (cct->_conf->log_early &&
232 !cct->_log->is_started()) {
233 cct->_log->start();
234 }
235
236 {
237 MonClient mc_bootstrap(cct);
238 err = mc_bootstrap.get_monmap_and_config();
239 if (err < 0)
240 return err;
241 }
242
243 common_init_finish(cct);
244
245 // get monmap
246 err = monclient.build_initial_monmap();
247 if (err < 0)
248 goto out;
249
250 err = -ENOMEM;
251 messenger = Messenger::create_client_messenger(cct, "radosclient");
252 if (!messenger)
253 goto out;
254
255 // require OSDREPLYMUX feature. this means we will fail to talk to
256 // old servers. this is necessary because otherwise we won't know
257 // how to decompose the reply data into its constituent pieces.
258 messenger->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));
259
260 ldout(cct, 1) << "starting msgr at " << messenger->get_myaddrs() << dendl;
261
262 ldout(cct, 1) << "starting objecter" << dendl;
263
264 objecter = new (std::nothrow) Objecter(cct, messenger, &monclient, &finisher);
265 if (!objecter)
266 goto out;
267 objecter->set_balanced_budget();
268
269 monclient.set_messenger(messenger);
270 mgrclient.set_messenger(messenger);
271
272 objecter->init();
273 messenger->add_dispatcher_head(&mgrclient);
274 messenger->add_dispatcher_tail(objecter);
275 messenger->add_dispatcher_tail(this);
276
277 messenger->start();
278
279 ldout(cct, 1) << "setting wanted keys" << dendl;
280 monclient.set_want_keys(
281 CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR);
282 ldout(cct, 1) << "calling monclient init" << dendl;
283 err = monclient.init();
284 if (err) {
285 ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl;
286 shutdown();
287 goto out;
288 }
289
290 err = monclient.authenticate(conf->client_mount_timeout);
291 if (err) {
292 ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl;
293 shutdown();
294 goto out;
295 }
296 messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
297
298 // Detect older cluster, put mgrclient into compatible mode
299 mgrclient.set_mgr_optional(
300 !get_required_monitor_features().contains_all(
301 ceph::features::mon::FEATURE_LUMINOUS));
302
303 // MgrClient needs this (it doesn't have MonClient reference itself)
304 monclient.sub_want("mgrmap", 0, 0);
305 monclient.renew_subs();
306
307 if (service_daemon) {
308 ldout(cct, 10) << __func__ << " registering as " << service_name << "."
309 << daemon_name << dendl;
310 mgrclient.service_daemon_register(service_name, daemon_name,
311 daemon_metadata);
312 }
313 mgrclient.init();
314
315 objecter->set_client_incarnation(0);
316 objecter->start();
317 lock.lock();
318
319 timer.init();
320
321 finisher.start();
322
323 state = CONNECTED;
324 instance_id = monclient.get_global_id();
325
326 lock.unlock();
327
328 ldout(cct, 1) << "init done" << dendl;
329 err = 0;
330
331 out:
332 if (err) {
333 state = DISCONNECTED;
334
335 if (objecter) {
336 delete objecter;
337 objecter = NULL;
338 }
339 if (messenger) {
340 delete messenger;
341 messenger = NULL;
342 }
343 }
344
345 return err;
346 }
347
348 void librados::RadosClient::shutdown()
349 {
350 std::unique_lock l{lock};
351 if (state == DISCONNECTED) {
352 return;
353 }
354
355 bool need_objecter = false;
356 if (objecter && objecter->initialized) {
357 need_objecter = true;
358 }
359
360 if (state == CONNECTED) {
361 if (need_objecter) {
362 // make sure watch callbacks are flushed
363 watch_flush();
364 }
365 finisher.wait_for_empty();
366 finisher.stop();
367 }
368 state = DISCONNECTED;
369 instance_id = 0;
370 timer.shutdown(); // will drop+retake lock
371 l.unlock();
372 if (need_objecter) {
373 objecter->shutdown();
374 }
375 mgrclient.shutdown();
376
377 monclient.shutdown();
378 if (messenger) {
379 messenger->shutdown();
380 messenger->wait();
381 }
382 ldout(cct, 1) << "shutdown" << dendl;
383 }
384
385 int librados::RadosClient::watch_flush()
386 {
387 ldout(cct, 10) << __func__ << " enter" << dendl;
388 ceph::mutex mylock = ceph::make_mutex("RadosClient::watch_flush::mylock");
389 ceph::condition_variable cond;
390 bool done;
391 objecter->linger_callback_flush(new C_SafeCond(mylock, cond, &done));
392
393 std::unique_lock l{mylock};
394 cond.wait(l, [&done] { return done; });
395 ldout(cct, 10) << __func__ << " exit" << dendl;
396 return 0;
397 }
398
399 struct C_aio_watch_flush_Complete : public Context {
400 librados::RadosClient *client;
401 librados::AioCompletionImpl *c;
402
403 C_aio_watch_flush_Complete(librados::RadosClient *_client, librados::AioCompletionImpl *_c)
404 : client(_client), c(_c) {
405 c->get();
406 }
407
408 void finish(int r) override {
409 c->lock.lock();
410 c->rval = r;
411 c->complete = true;
412 c->cond.notify_all();
413
414 if (c->callback_complete ||
415 c->callback_safe) {
416 client->finisher.queue(new librados::C_AioComplete(c));
417 }
418 c->put_unlock();
419 }
420 };
421
422 int librados::RadosClient::async_watch_flush(AioCompletionImpl *c)
423 {
424 ldout(cct, 10) << __func__ << " enter" << dendl;
425 Context *oncomplete = new C_aio_watch_flush_Complete(this, c);
426 objecter->linger_callback_flush(oncomplete);
427 ldout(cct, 10) << __func__ << " exit" << dendl;
428 return 0;
429 }
430
431 uint64_t librados::RadosClient::get_instance_id()
432 {
433 return instance_id;
434 }
435
436 int librados::RadosClient::get_min_compatible_osd(int8_t* require_osd_release)
437 {
438 int r = wait_for_osdmap();
439 if (r < 0) {
440 return r;
441 }
442
443 objecter->with_osdmap(
444 [require_osd_release](const OSDMap& o) {
445 *require_osd_release = ceph::to_integer<int8_t>(o.require_osd_release);
446 });
447 return 0;
448 }
449
450 int librados::RadosClient::get_min_compatible_client(int8_t* min_compat_client,
451 int8_t* require_min_compat_client)
452 {
453 int r = wait_for_osdmap();
454 if (r < 0) {
455 return r;
456 }
457
458 objecter->with_osdmap(
459 [min_compat_client, require_min_compat_client](const OSDMap& o) {
460 *min_compat_client =
461 ceph::to_integer<int8_t>(o.get_min_compat_client());
462 *require_min_compat_client =
463 ceph::to_integer<int8_t>(o.get_require_min_compat_client());
464 });
465 return 0;
466 }
467
468 librados::RadosClient::~RadosClient()
469 {
470 conf.remove_observer(this);
471 if (messenger)
472 delete messenger;
473 if (objecter)
474 delete objecter;
475 cct = NULL;
476 }
477
478 int librados::RadosClient::create_ioctx(const char *name, IoCtxImpl **io)
479 {
480 int64_t poolid = lookup_pool(name);
481 if (poolid < 0) {
482 return (int)poolid;
483 }
484
485 *io = new librados::IoCtxImpl(this, objecter, poolid, CEPH_NOSNAP);
486 return 0;
487 }
488
489 int librados::RadosClient::create_ioctx(int64_t pool_id, IoCtxImpl **io)
490 {
491 std::string pool_name;
492 int r = pool_get_name(pool_id, &pool_name, true);
493 if (r < 0)
494 return r;
495 *io = new librados::IoCtxImpl(this, objecter, pool_id, CEPH_NOSNAP);
496 return 0;
497 }
498
499 bool librados::RadosClient::ms_dispatch(Message *m)
500 {
501 bool ret;
502
503 std::lock_guard l(lock);
504 if (state == DISCONNECTED) {
505 ldout(cct, 10) << "disconnected, discarding " << *m << dendl;
506 m->put();
507 ret = true;
508 } else {
509 ret = _dispatch(m);
510 }
511 return ret;
512 }
513
514 void librados::RadosClient::ms_handle_connect(Connection *con)
515 {
516 }
517
518 bool librados::RadosClient::ms_handle_reset(Connection *con)
519 {
520 return false;
521 }
522
523 void librados::RadosClient::ms_handle_remote_reset(Connection *con)
524 {
525 }
526
527 bool librados::RadosClient::ms_handle_refused(Connection *con)
528 {
529 return false;
530 }
531
532 bool librados::RadosClient::_dispatch(Message *m)
533 {
534 ceph_assert(ceph_mutex_is_locked(lock));
535 switch (m->get_type()) {
536 // OSD
537 case CEPH_MSG_OSD_MAP:
538 cond.notify_all();
539 m->put();
540 break;
541
542 case CEPH_MSG_MDS_MAP:
543 m->put();
544 break;
545
546 case MSG_LOG:
547 handle_log(static_cast<MLog *>(m));
548 break;
549
550 default:
551 return false;
552 }
553
554 return true;
555 }
556
557
558 int librados::RadosClient::wait_for_osdmap()
559 {
560 ceph_assert(ceph_mutex_is_not_locked_by_me(lock));
561
562 if (state != CONNECTED) {
563 return -ENOTCONN;
564 }
565
566 bool need_map = false;
567 objecter->with_osdmap([&](const OSDMap& o) {
568 if (o.get_epoch() == 0) {
569 need_map = true;
570 }
571 });
572
573 if (need_map) {
574 std::unique_lock l(lock);
575
576 ceph::timespan timeout = rados_mon_op_timeout;
577 if (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
578 ldout(cct, 10) << __func__ << " waiting" << dendl;
579 while (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
580 if (timeout == timeout.zero()) {
581 cond.wait(l);
582 } else {
583 if (cond.wait_for(l, timeout) == std::cv_status::timeout) {
584 lderr(cct) << "timed out waiting for first osdmap from monitors"
585 << dendl;
586 return -ETIMEDOUT;
587 }
588 }
589 }
590 ldout(cct, 10) << __func__ << " done waiting" << dendl;
591 }
592 return 0;
593 } else {
594 return 0;
595 }
596 }
597
598
599 int librados::RadosClient::wait_for_latest_osdmap()
600 {
601 ceph::mutex mylock = ceph::make_mutex("RadosClient::wait_for_latest_osdmap");
602 ceph::condition_variable cond;
603 bool done;
604
605 objecter->wait_for_latest_osdmap(new C_SafeCond(mylock, cond, &done));
606
607 std::unique_lock l{mylock};
608 cond.wait(l, [&done] {return done;});
609 return 0;
610 }
611
612 int librados::RadosClient::pool_list(std::list<std::pair<int64_t, string> >& v)
613 {
614 int r = wait_for_osdmap();
615 if (r < 0)
616 return r;
617
618 objecter->with_osdmap([&](const OSDMap& o) {
619 for (auto p : o.get_pools())
620 v.push_back(std::make_pair(p.first, o.get_pool_name(p.first)));
621 });
622 return 0;
623 }
624
625 int librados::RadosClient::get_pool_stats(std::list<string>& pools,
626 map<string,::pool_stat_t> *result,
627 bool *per_pool)
628 {
629 ceph::mutex mylock = ceph::make_mutex("RadosClient::get_pool_stats::mylock");
630 ceph::condition_variable cond;
631 bool done;
632 int ret = 0;
633
634 objecter->get_pool_stats(pools, result, per_pool,
635 new C_SafeCond(mylock, cond, &done,
636 &ret));
637
638 unique_lock l{mylock};
639 cond.wait(l, [&done] { return done;});
640 return ret;
641 }
642
643 bool librados::RadosClient::get_pool_is_selfmanaged_snaps_mode(
644 const std::string& pool)
645 {
646 bool ret = false;
647 objecter->with_osdmap([&](const OSDMap& osdmap) {
648 int64_t poolid = osdmap.lookup_pg_pool_name(pool);
649 if (poolid >= 0)
650 ret = osdmap.get_pg_pool(poolid)->is_unmanaged_snaps_mode();
651 });
652 return ret;
653 }
654
655 int librados::RadosClient::get_fs_stats(ceph_statfs& stats)
656 {
657 ceph::mutex mylock = ceph::make_mutex("RadosClient::get_fs_stats::mylock");
658 ceph::condition_variable cond;
659 bool done;
660 int ret = 0;
661 {
662 std::lock_guard l{mylock};
663 objecter->get_fs_stats(stats, boost::optional<int64_t> (),
664 new C_SafeCond(mylock, cond, &done, &ret));
665 }
666 {
667 std::unique_lock l{mylock};
668 cond.wait(l, [&done] { return done;});
669 }
670 return ret;
671 }
672
673 void librados::RadosClient::get() {
674 std::lock_guard l(lock);
675 ceph_assert(refcnt > 0);
676 refcnt++;
677 }
678
679 bool librados::RadosClient::put() {
680 std::lock_guard l(lock);
681 ceph_assert(refcnt > 0);
682 refcnt--;
683 return (refcnt == 0);
684 }
685
686 int librados::RadosClient::pool_create(string& name,
687 int16_t crush_rule)
688 {
689 if (!name.length())
690 return -EINVAL;
691
692 int r = wait_for_osdmap();
693 if (r < 0) {
694 return r;
695 }
696
697 ceph::mutex mylock = ceph::make_mutex("RadosClient::pool_create::mylock");
698 int reply;
699 ceph::condition_variable cond;
700 bool done;
701 Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply);
702 reply = objecter->create_pool(name, onfinish, crush_rule);
703
704 if (reply < 0) {
705 delete onfinish;
706 } else {
707 std::unique_lock l{mylock};
708 cond.wait(l, [&done] { return done; });
709 }
710 return reply;
711 }
712
713 int librados::RadosClient::pool_create_async(string& name,
714 PoolAsyncCompletionImpl *c,
715 int16_t crush_rule)
716 {
717 int r = wait_for_osdmap();
718 if (r < 0)
719 return r;
720
721 Context *onfinish = new C_PoolAsync_Safe(c);
722 r = objecter->create_pool(name, onfinish, crush_rule);
723 if (r < 0) {
724 delete onfinish;
725 }
726 return r;
727 }
728
729 int librados::RadosClient::pool_get_base_tier(int64_t pool_id, int64_t* base_tier)
730 {
731 int r = wait_for_osdmap();
732 if (r < 0) {
733 return r;
734 }
735
736 objecter->with_osdmap([&](const OSDMap& o) {
737 const pg_pool_t* pool = o.get_pg_pool(pool_id);
738 if (pool) {
739 if (pool->tier_of < 0) {
740 *base_tier = pool_id;
741 } else {
742 *base_tier = pool->tier_of;
743 }
744 r = 0;
745 } else {
746 r = -ENOENT;
747 }
748 });
749 return r;
750 }
751
752 int librados::RadosClient::pool_delete(const char *name)
753 {
754 int r = wait_for_osdmap();
755 if (r < 0) {
756 return r;
757 }
758
759 ceph::mutex mylock = ceph::make_mutex("RadosClient::pool_delete::mylock");
760 ceph::condition_variable cond;
761 bool done;
762 int ret;
763 Context *onfinish = new C_SafeCond(mylock, cond, &done, &ret);
764 ret = objecter->delete_pool(name, onfinish);
765
766 if (ret < 0) {
767 delete onfinish;
768 } else {
769 std::unique_lock l{mylock};
770 cond.wait(l, [&done] { return done;});
771 }
772 return ret;
773 }
774
775 int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompletionImpl *c)
776 {
777 int r = wait_for_osdmap();
778 if (r < 0)
779 return r;
780
781 Context *onfinish = new C_PoolAsync_Safe(c);
782 r = objecter->delete_pool(name, onfinish);
783 if (r < 0) {
784 delete onfinish;
785 }
786 return r;
787 }
788
789 void librados::RadosClient::blacklist_self(bool set) {
790 std::lock_guard l(lock);
791 objecter->blacklist_self(set);
792 }
793
794 std::string librados::RadosClient::get_addrs() const {
795 CachedStackStringStream cos;
796 *cos << messenger->get_myaddrs();
797 return std::string(cos->strv());
798 }
799
800 int librados::RadosClient::blacklist_add(const string& client_address,
801 uint32_t expire_seconds)
802 {
803 entity_addr_t addr;
804 if (!addr.parse(client_address.c_str(), 0)) {
805 lderr(cct) << "unable to parse address " << client_address << dendl;
806 return -EINVAL;
807 }
808
809 std::stringstream cmd;
810 cmd << "{"
811 << "\"prefix\": \"osd blacklist\", "
812 << "\"blacklistop\": \"add\", "
813 << "\"addr\": \"" << client_address << "\"";
814 if (expire_seconds != 0) {
815 cmd << ", \"expire\": " << expire_seconds << ".0";
816 }
817 cmd << "}";
818
819 std::vector<std::string> cmds;
820 cmds.push_back(cmd.str());
821 bufferlist inbl;
822 int r = mon_command(cmds, inbl, NULL, NULL);
823 if (r < 0) {
824 return r;
825 }
826
827 // ensure we have the latest osd map epoch before proceeding
828 r = wait_for_latest_osdmap();
829 return r;
830 }
831
832 int librados::RadosClient::mon_command(const vector<string>& cmd,
833 const bufferlist &inbl,
834 bufferlist *outbl, string *outs)
835 {
836 C_SaferCond ctx;
837 mon_command_async(cmd, inbl, outbl, outs, &ctx);
838 return ctx.wait();
839 }
840
841 void librados::RadosClient::mon_command_async(const vector<string>& cmd,
842 const bufferlist &inbl,
843 bufferlist *outbl, string *outs,
844 Context *on_finish)
845 {
846 std::lock_guard l{lock};
847 monclient.start_mon_command(cmd, inbl, outbl, outs, on_finish);
848 }
849
850 int librados::RadosClient::mgr_command(const vector<string>& cmd,
851 const bufferlist &inbl,
852 bufferlist *outbl, string *outs)
853 {
854 std::lock_guard l(lock);
855
856 C_SaferCond cond;
857 int r = mgrclient.start_command(cmd, inbl, outbl, outs, &cond);
858 if (r < 0)
859 return r;
860
861 lock.unlock();
862 if (rados_mon_op_timeout.count() > 0) {
863 r = cond.wait_for(rados_mon_op_timeout);
864 } else {
865 r = cond.wait();
866 }
867 lock.lock();
868
869 return r;
870 }
871
872 int librados::RadosClient::mgr_command(
873 const string& name,
874 const vector<string>& cmd,
875 const bufferlist &inbl,
876 bufferlist *outbl, string *outs)
877 {
878 std::lock_guard l(lock);
879
880 C_SaferCond cond;
881 int r = mgrclient.start_tell_command(name, cmd, inbl, outbl, outs, &cond);
882 if (r < 0)
883 return r;
884
885 lock.unlock();
886 if (rados_mon_op_timeout.count() > 0) {
887 r = cond.wait_for(rados_mon_op_timeout);
888 } else {
889 r = cond.wait();
890 }
891 lock.lock();
892
893 return r;
894 }
895
896
897 int librados::RadosClient::mon_command(int rank, const vector<string>& cmd,
898 const bufferlist &inbl,
899 bufferlist *outbl, string *outs)
900 {
901 ceph::mutex mylock = ceph::make_mutex("RadosClient::mon_command::mylock");
902 ceph::condition_variable cond;
903 bool done;
904 int rval;
905 {
906 std::lock_guard l{mylock};
907 monclient.start_mon_command(rank, cmd, inbl, outbl, outs,
908 new C_SafeCond(mylock, cond, &done, &rval));
909 }
910 std::unique_lock l{mylock};
911 cond.wait(l, [&done] { return done;});
912 return rval;
913 }
914
915 int librados::RadosClient::mon_command(string name, const vector<string>& cmd,
916 const bufferlist &inbl,
917 bufferlist *outbl, string *outs)
918 {
919 ceph::mutex mylock = ceph::make_mutex("RadosClient::mon_command::mylock");
920 ceph::condition_variable cond;
921 bool done;
922 int rval;
923 {
924 std::lock_guard l{mylock};
925 monclient.start_mon_command(name, cmd, inbl, outbl, outs,
926 new C_SafeCond(mylock, cond, &done, &rval));
927 }
928 std::unique_lock l{mylock};
929 cond.wait(l, [&done] { return done;});
930 return rval;
931 }
932
933 int librados::RadosClient::osd_command(int osd, vector<string>& cmd,
934 const bufferlist& inbl,
935 bufferlist *poutbl, string *prs)
936 {
937 ceph::mutex mylock = ceph::make_mutex("RadosClient::osd_command::mylock");
938 ceph::condition_variable cond;
939 bool done;
940 int ret;
941 ceph_tid_t tid;
942
943 if (osd < 0)
944 return -EINVAL;
945
946 {
947 std::lock_guard l{mylock};
948 // XXX do anything with tid?
949 objecter->osd_command(osd, cmd, inbl, &tid, poutbl, prs,
950 new C_SafeCond(mylock, cond, &done, &ret));
951 }
952 std::unique_lock l{mylock};
953 cond.wait(l, [&done] { return done;});
954 return ret;
955 }
956
957 int librados::RadosClient::pg_command(pg_t pgid, vector<string>& cmd,
958 const bufferlist& inbl,
959 bufferlist *poutbl, string *prs)
960 {
961 ceph::mutex mylock = ceph::make_mutex("RadosClient::pg_command::mylock");
962 ceph::condition_variable cond;
963 bool done;
964 int ret;
965 ceph_tid_t tid;
966
967 {
968 std::lock_guard l{lock};
969 objecter->pg_command(pgid, cmd, inbl, &tid, poutbl, prs,
970 new C_SafeCond(mylock, cond, &done, &ret));
971 }
972 std::unique_lock l{mylock};
973 cond.wait(l, [&done] { return done;});
974 return ret;
975 }
976
977 int librados::RadosClient::monitor_log(const string& level,
978 rados_log_callback_t cb,
979 rados_log_callback2_t cb2,
980 void *arg)
981 {
982 std::lock_guard l(lock);
983
984 if (state != CONNECTED) {
985 return -ENOTCONN;
986 }
987
988 if (cb == NULL && cb2 == NULL) {
989 // stop watch
990 ldout(cct, 10) << __func__ << " removing cb " << (void*)log_cb
991 << " " << (void*)log_cb2 << dendl;
992 monclient.sub_unwant(log_watch);
993 log_watch.clear();
994 log_cb = NULL;
995 log_cb2 = NULL;
996 log_cb_arg = NULL;
997 return 0;
998 }
999
1000 string watch_level;
1001 if (level == "debug") {
1002 watch_level = "log-debug";
1003 } else if (level == "info") {
1004 watch_level = "log-info";
1005 } else if (level == "warn" || level == "warning") {
1006 watch_level = "log-warn";
1007 } else if (level == "err" || level == "error") {
1008 watch_level = "log-error";
1009 } else if (level == "sec") {
1010 watch_level = "log-sec";
1011 } else {
1012 ldout(cct, 10) << __func__ << " invalid level " << level << dendl;
1013 return -EINVAL;
1014 }
1015
1016 if (log_cb || log_cb2)
1017 monclient.sub_unwant(log_watch);
1018
1019 // (re)start watch
1020 ldout(cct, 10) << __func__ << " add cb " << (void*)cb << " " << (void*)cb2
1021 << " level " << level << dendl;
1022 monclient.sub_want(watch_level, 0, 0);
1023
1024 monclient.renew_subs();
1025 log_cb = cb;
1026 log_cb2 = cb2;
1027 log_cb_arg = arg;
1028 log_watch = watch_level;
1029 return 0;
1030 }
1031
1032 void librados::RadosClient::handle_log(MLog *m)
1033 {
1034 ceph_assert(ceph_mutex_is_locked(lock));
1035 ldout(cct, 10) << __func__ << " version " << m->version << dendl;
1036
1037 if (log_last_version < m->version) {
1038 log_last_version = m->version;
1039
1040 if (log_cb || log_cb2) {
1041 for (std::deque<LogEntry>::iterator it = m->entries.begin(); it != m->entries.end(); ++it) {
1042 LogEntry e = *it;
1043 ostringstream ss;
1044 ss << e.stamp << " " << e.name << " " << e.prio << " " << e.msg;
1045 string line = ss.str();
1046 string who = stringify(e.rank) + " " + stringify(e.addrs);
1047 string name = stringify(e.name);
1048 string level = stringify(e.prio);
1049 struct timespec stamp;
1050 e.stamp.to_timespec(&stamp);
1051
1052 ldout(cct, 20) << __func__ << " delivering " << ss.str() << dendl;
1053 if (log_cb)
1054 log_cb(log_cb_arg, line.c_str(), who.c_str(),
1055 stamp.tv_sec, stamp.tv_nsec,
1056 e.seq, level.c_str(), e.msg.c_str());
1057 if (log_cb2)
1058 log_cb2(log_cb_arg, line.c_str(),
1059 e.channel.c_str(),
1060 who.c_str(), name.c_str(),
1061 stamp.tv_sec, stamp.tv_nsec,
1062 e.seq, level.c_str(), e.msg.c_str());
1063 }
1064 }
1065
1066 monclient.sub_got(log_watch, log_last_version);
1067 }
1068
1069 m->put();
1070 }
1071
1072 int librados::RadosClient::service_daemon_register(
1073 const std::string& service, ///< service name (e.g., 'rgw')
1074 const std::string& name, ///< daemon name (e.g., 'gwfoo')
1075 const std::map<std::string,std::string>& metadata)
1076 {
1077 if (service_daemon) {
1078 return -EEXIST;
1079 }
1080 if (service == "osd" ||
1081 service == "mds" ||
1082 service == "client" ||
1083 service == "mon" ||
1084 service == "mgr") {
1085 // normal ceph entity types are not allowed!
1086 return -EINVAL;
1087 }
1088 if (service.empty() || name.empty()) {
1089 return -EINVAL;
1090 }
1091
1092 collect_sys_info(&daemon_metadata, cct);
1093
1094 ldout(cct,10) << __func__ << " " << service << "." << name << dendl;
1095 service_daemon = true;
1096 service_name = service;
1097 daemon_name = name;
1098 daemon_metadata.insert(metadata.begin(), metadata.end());
1099
1100 if (state == DISCONNECTED) {
1101 return 0;
1102 }
1103 if (state == CONNECTING) {
1104 return -EBUSY;
1105 }
1106 mgrclient.service_daemon_register(service_name, daemon_name,
1107 daemon_metadata);
1108 return 0;
1109 }
1110
1111 int librados::RadosClient::service_daemon_update_status(
1112 std::map<std::string,std::string>&& status)
1113 {
1114 if (state != CONNECTED) {
1115 return -ENOTCONN;
1116 }
1117 return mgrclient.service_daemon_update_status(std::move(status));
1118 }
1119
1120 mon_feature_t librados::RadosClient::get_required_monitor_features() const
1121 {
1122 return monclient.with_monmap([](const MonMap &monmap) {
1123 return monmap.get_required_features(); } );
1124 }
1125
1126 int librados::RadosClient::get_inconsistent_pgs(int64_t pool_id,
1127 std::vector<std::string>* pgs)
1128 {
1129 vector<string> cmd = {
1130 "{\"prefix\": \"pg ls\","
1131 "\"pool\": " + std::to_string(pool_id) + ","
1132 "\"states\": [\"inconsistent\"],"
1133 "\"format\": \"json\"}"
1134 };
1135 bufferlist inbl, outbl;
1136 string outstring;
1137 if (auto ret = mgr_command(cmd, inbl, &outbl, &outstring); ret) {
1138 return ret;
1139 }
1140 if (!outbl.length()) {
1141 // no pg returned
1142 return 0;
1143 }
1144 JSONParser parser;
1145 if (!parser.parse(outbl.c_str(), outbl.length())) {
1146 return -EINVAL;
1147 }
1148 vector<string> v;
1149 if (!parser.is_array()) {
1150 JSONObj *pgstat_obj = parser.find_obj("pg_stats");
1151 if (!pgstat_obj)
1152 return 0;
1153 auto s = pgstat_obj->get_data();
1154 JSONParser pg_stats;
1155 if (!pg_stats.parse(s.c_str(), s.length())) {
1156 return -EINVAL;
1157 }
1158 v = pg_stats.get_array_elements();
1159 } else {
1160 v = parser.get_array_elements();
1161 }
1162 for (auto i : v) {
1163 JSONParser pg_json;
1164 if (!pg_json.parse(i.c_str(), i.length())) {
1165 return -EINVAL;
1166 }
1167 string pgid;
1168 JSONDecoder::decode_json("pgid", pgid, &pg_json);
1169 pgs->emplace_back(std::move(pgid));
1170 }
1171 return 0;
1172 }
1173
1174 const char** librados::RadosClient::get_tracked_conf_keys() const
1175 {
1176 static const char *config_keys[] = {
1177 "rados_mon_op_timeout",
1178 nullptr
1179 };
1180 return config_keys;
1181 }
1182
1183 void librados::RadosClient::handle_conf_change(const ConfigProxy& conf,
1184 const std::set<std::string> &changed)
1185 {
1186 if (changed.count("rados_mon_op_timeout")) {
1187 rados_mon_op_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
1188 }
1189 }