1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <sys/types.h>
25 #include "common/ceph_context.h"
26 #include "common/config.h"
27 #include "common/common_init.h"
28 #include "common/ceph_json.h"
29 #include "common/errno.h"
30 #include "common/ceph_json.h"
31 #include "include/buffer.h"
32 #include "include/stringify.h"
33 #include "include/util.h"
35 #include "messages/MLog.h"
36 #include "msg/Messenger.h"
38 // needed for static_cast
39 #include "messages/PaxosServiceMessage.h"
40 #include "messages/MPoolOpReply.h"
41 #include "messages/MStatfsReply.h"
42 #include "messages/MGetPoolStatsReply.h"
43 #include "messages/MOSDOpReply.h"
44 #include "messages/MOSDMap.h"
45 #include "messages/MCommandReply.h"
47 #include "AioCompletionImpl.h"
48 #include "IoCtxImpl.h"
49 #include "PoolAsyncCompletionImpl.h"
50 #include "RadosClient.h"
52 #include "include/ceph_assert.h"
53 #include "common/EventTrace.h"
55 #define dout_subsys ceph_subsys_rados
57 #define dout_prefix *_dout << "librados: "
59 librados::RadosClient::RadosClient(CephContext
*cct_
)
60 : Dispatcher(cct_
->get()),
61 cct_deleter
{cct_
, [](CephContext
*p
) {p
->put();}},
65 mgrclient(cct_
, nullptr, &monclient
.monmap
),
71 log_last_version(0), log_cb(NULL
), log_cb2(NULL
), log_cb_arg(NULL
),
72 finisher(cct
, "radosclient", "fn-radosclient")
76 int64_t librados::RadosClient::lookup_pool(const char *name
)
78 int r
= wait_for_osdmap();
83 int64_t ret
= objecter
->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name
),
86 // Make sure we have the latest map
87 int r
= wait_for_latest_osdmap();
90 ret
= objecter
->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name
),
97 bool librados::RadosClient::pool_requires_alignment(int64_t pool_id
)
100 int r
= pool_requires_alignment2(pool_id
, &requires
);
102 // Cast answer to false, this is a little bit problematic
103 // since we really don't know the answer yet, say.
110 // a safer version of pool_requires_alignment
111 int librados::RadosClient::pool_requires_alignment2(int64_t pool_id
,
117 int r
= wait_for_osdmap();
122 return objecter
->with_osdmap([requires
, pool_id
](const OSDMap
& o
) {
123 if (!o
.have_pg_pool(pool_id
)) {
126 *requires
= o
.get_pg_pool(pool_id
)->requires_aligned_append();
131 uint64_t librados::RadosClient::pool_required_alignment(int64_t pool_id
)
134 int r
= pool_required_alignment2(pool_id
, &alignment
);
142 // a safer version of pool_required_alignment
143 int librados::RadosClient::pool_required_alignment2(int64_t pool_id
,
149 int r
= wait_for_osdmap();
154 return objecter
->with_osdmap([alignment
, pool_id
](const OSDMap
&o
) {
155 if (!o
.have_pg_pool(pool_id
)) {
158 *alignment
= o
.get_pg_pool(pool_id
)->required_alignment();
163 int librados::RadosClient::pool_get_name(uint64_t pool_id
, std::string
*s
, bool wait_latest_map
)
165 int r
= wait_for_osdmap();
169 objecter
->with_osdmap([&](const OSDMap
& o
) {
170 if (!o
.have_pg_pool(pool_id
)) {
174 *s
= o
.get_pool_name(pool_id
);
177 if (r
== -ENOENT
&& wait_latest_map
) {
178 r
= wait_for_latest_osdmap();
181 wait_latest_map
= false;
188 int librados::RadosClient::get_fsid(std::string
*s
)
192 std::lock_guard
l(lock
);
194 oss
<< monclient
.get_fsid();
199 int librados::RadosClient::ping_monitor(const string mon_id
, string
*result
)
202 /* If we haven't yet connected, we have no way of telling whether we
203 * already built monc's initial monmap. IF we are in CONNECTED state,
204 * then it is safe to assume that we went through connect(), which does
207 if (state
!= CONNECTED
) {
208 ldout(cct
, 10) << __func__
<< " build monmap" << dendl
;
209 err
= monclient
.build_initial_monmap();
215 err
= monclient
.ping_monitor(mon_id
, result
);
219 int librados::RadosClient::connect()
223 // already connected?
224 if (state
== CONNECTING
)
226 if (state
== CONNECTED
)
230 if (cct
->_conf
->log_early
&&
231 !cct
->_log
->is_started()) {
236 MonClient
mc_bootstrap(cct
);
237 err
= mc_bootstrap
.get_monmap_and_config();
242 common_init_finish(cct
);
245 err
= monclient
.build_initial_monmap();
250 messenger
= Messenger::create_client_messenger(cct
, "radosclient");
254 // require OSDREPLYMUX feature. this means we will fail to talk to
255 // old servers. this is necessary because otherwise we won't know
256 // how to decompose the reply data into its constituent pieces.
257 messenger
->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX
));
259 ldout(cct
, 1) << "starting msgr at " << messenger
->get_myaddrs() << dendl
;
261 ldout(cct
, 1) << "starting objecter" << dendl
;
263 objecter
= new (std::nothrow
) Objecter(cct
, messenger
, &monclient
,
265 cct
->_conf
->rados_mon_op_timeout
,
266 cct
->_conf
->rados_osd_op_timeout
);
269 objecter
->set_balanced_budget();
271 monclient
.set_messenger(messenger
);
272 mgrclient
.set_messenger(messenger
);
275 messenger
->add_dispatcher_head(&mgrclient
);
276 messenger
->add_dispatcher_tail(objecter
);
277 messenger
->add_dispatcher_tail(this);
281 ldout(cct
, 1) << "setting wanted keys" << dendl
;
282 monclient
.set_want_keys(
283 CEPH_ENTITY_TYPE_MON
| CEPH_ENTITY_TYPE_OSD
| CEPH_ENTITY_TYPE_MGR
);
284 ldout(cct
, 1) << "calling monclient init" << dendl
;
285 err
= monclient
.init();
287 ldout(cct
, 0) << conf
->name
<< " initialization error " << cpp_strerror(-err
) << dendl
;
292 err
= monclient
.authenticate(conf
->client_mount_timeout
);
294 ldout(cct
, 0) << conf
->name
<< " authentication error " << cpp_strerror(-err
) << dendl
;
298 messenger
->set_myname(entity_name_t::CLIENT(monclient
.get_global_id()));
300 // Detect older cluster, put mgrclient into compatible mode
301 mgrclient
.set_mgr_optional(
302 !get_required_monitor_features().contains_all(
303 ceph::features::mon::FEATURE_LUMINOUS
));
305 // MgrClient needs this (it doesn't have MonClient reference itself)
306 monclient
.sub_want("mgrmap", 0, 0);
307 monclient
.renew_subs();
309 if (service_daemon
) {
310 ldout(cct
, 10) << __func__
<< " registering as " << service_name
<< "."
311 << daemon_name
<< dendl
;
312 mgrclient
.service_daemon_register(service_name
, daemon_name
,
317 objecter
->set_client_incarnation(0);
326 instance_id
= monclient
.get_global_id();
330 ldout(cct
, 1) << "init done" << dendl
;
335 state
= DISCONNECTED
;
350 void librados::RadosClient::shutdown()
352 std::unique_lock l
{lock
};
353 if (state
== DISCONNECTED
) {
357 bool need_objecter
= false;
358 if (objecter
&& objecter
->initialized
) {
359 need_objecter
= true;
362 if (state
== CONNECTED
) {
364 // make sure watch callbacks are flushed
367 finisher
.wait_for_empty();
370 state
= DISCONNECTED
;
372 timer
.shutdown(); // will drop+retake lock
375 objecter
->shutdown();
377 mgrclient
.shutdown();
379 monclient
.shutdown();
381 messenger
->shutdown();
384 ldout(cct
, 1) << "shutdown" << dendl
;
387 int librados::RadosClient::watch_flush()
389 ldout(cct
, 10) << __func__
<< " enter" << dendl
;
390 ceph::mutex mylock
= ceph::make_mutex("RadosClient::watch_flush::mylock");
391 ceph::condition_variable cond
;
393 objecter
->linger_callback_flush(new C_SafeCond(mylock
, cond
, &done
));
395 std::unique_lock l
{mylock
};
396 cond
.wait(l
, [&done
] { return done
; });
397 ldout(cct
, 10) << __func__
<< " exit" << dendl
;
401 struct C_aio_watch_flush_Complete
: public Context
{
402 librados::RadosClient
*client
;
403 librados::AioCompletionImpl
*c
;
405 C_aio_watch_flush_Complete(librados::RadosClient
*_client
, librados::AioCompletionImpl
*_c
)
406 : client(_client
), c(_c
) {
410 void finish(int r
) override
{
414 c
->cond
.notify_all();
416 if (c
->callback_complete
||
418 client
->finisher
.queue(new librados::C_AioComplete(c
));
424 int librados::RadosClient::async_watch_flush(AioCompletionImpl
*c
)
426 ldout(cct
, 10) << __func__
<< " enter" << dendl
;
427 Context
*oncomplete
= new C_aio_watch_flush_Complete(this, c
);
428 objecter
->linger_callback_flush(oncomplete
);
429 ldout(cct
, 10) << __func__
<< " exit" << dendl
;
433 uint64_t librados::RadosClient::get_instance_id()
438 int librados::RadosClient::get_min_compatible_osd(int8_t* require_osd_release
)
440 int r
= wait_for_osdmap();
445 objecter
->with_osdmap(
446 [require_osd_release
](const OSDMap
& o
) {
447 *require_osd_release
= ceph::to_integer
<int8_t>(o
.require_osd_release
);
452 int librados::RadosClient::get_min_compatible_client(int8_t* min_compat_client
,
453 int8_t* require_min_compat_client
)
455 int r
= wait_for_osdmap();
460 objecter
->with_osdmap(
461 [min_compat_client
, require_min_compat_client
](const OSDMap
& o
) {
463 ceph::to_integer
<int8_t>(o
.get_min_compat_client());
464 *require_min_compat_client
=
465 ceph::to_integer
<int8_t>(o
.get_require_min_compat_client());
470 librados::RadosClient::~RadosClient()
479 int librados::RadosClient::create_ioctx(const char *name
, IoCtxImpl
**io
)
481 int64_t poolid
= lookup_pool(name
);
486 *io
= new librados::IoCtxImpl(this, objecter
, poolid
, CEPH_NOSNAP
);
490 int librados::RadosClient::create_ioctx(int64_t pool_id
, IoCtxImpl
**io
)
492 std::string pool_name
;
493 int r
= pool_get_name(pool_id
, &pool_name
, true);
496 *io
= new librados::IoCtxImpl(this, objecter
, pool_id
, CEPH_NOSNAP
);
500 bool librados::RadosClient::ms_dispatch(Message
*m
)
504 std::lock_guard
l(lock
);
505 if (state
== DISCONNECTED
) {
506 ldout(cct
, 10) << "disconnected, discarding " << *m
<< dendl
;
515 void librados::RadosClient::ms_handle_connect(Connection
*con
)
519 bool librados::RadosClient::ms_handle_reset(Connection
*con
)
524 void librados::RadosClient::ms_handle_remote_reset(Connection
*con
)
528 bool librados::RadosClient::ms_handle_refused(Connection
*con
)
533 bool librados::RadosClient::_dispatch(Message
*m
)
535 ceph_assert(ceph_mutex_is_locked(lock
));
536 switch (m
->get_type()) {
538 case CEPH_MSG_OSD_MAP
:
543 case CEPH_MSG_MDS_MAP
:
548 handle_log(static_cast<MLog
*>(m
));
559 int librados::RadosClient::wait_for_osdmap()
561 ceph_assert(ceph_mutex_is_not_locked_by_me(lock
));
563 if (state
!= CONNECTED
) {
567 bool need_map
= false;
568 objecter
->with_osdmap([&](const OSDMap
& o
) {
569 if (o
.get_epoch() == 0) {
575 std::unique_lock
l(lock
);
577 ceph::timespan timeout
{0};
578 if (cct
->_conf
->rados_mon_op_timeout
> 0) {
579 timeout
= ceph::make_timespan(cct
->_conf
->rados_mon_op_timeout
);
582 if (objecter
->with_osdmap(std::mem_fn(&OSDMap::get_epoch
)) == 0) {
583 ldout(cct
, 10) << __func__
<< " waiting" << dendl
;
584 while (objecter
->with_osdmap(std::mem_fn(&OSDMap::get_epoch
)) == 0) {
585 if (timeout
== timeout
.zero()) {
588 if (cond
.wait_for(l
, timeout
) == std::cv_status::timeout
) {
589 lderr(cct
) << "timed out waiting for first osdmap from monitors"
595 ldout(cct
, 10) << __func__
<< " done waiting" << dendl
;
604 int librados::RadosClient::wait_for_latest_osdmap()
606 ceph::mutex mylock
= ceph::make_mutex("RadosClient::wait_for_latest_osdmap");
607 ceph::condition_variable cond
;
610 objecter
->wait_for_latest_osdmap(new C_SafeCond(mylock
, cond
, &done
));
612 std::unique_lock l
{mylock
};
613 cond
.wait(l
, [&done
] {return done
;});
617 int librados::RadosClient::pool_list(std::list
<std::pair
<int64_t, string
> >& v
)
619 int r
= wait_for_osdmap();
623 objecter
->with_osdmap([&](const OSDMap
& o
) {
624 for (auto p
: o
.get_pools())
625 v
.push_back(std::make_pair(p
.first
, o
.get_pool_name(p
.first
)));
630 int librados::RadosClient::get_pool_stats(std::list
<string
>& pools
,
631 map
<string
,::pool_stat_t
> *result
,
634 ceph::mutex mylock
= ceph::make_mutex("RadosClient::get_pool_stats::mylock");
635 ceph::condition_variable cond
;
639 objecter
->get_pool_stats(pools
, result
, per_pool
,
640 new C_SafeCond(mylock
, cond
, &done
,
643 unique_lock l
{mylock
};
644 cond
.wait(l
, [&done
] { return done
;});
648 bool librados::RadosClient::get_pool_is_selfmanaged_snaps_mode(
649 const std::string
& pool
)
652 objecter
->with_osdmap([&](const OSDMap
& osdmap
) {
653 int64_t poolid
= osdmap
.lookup_pg_pool_name(pool
);
655 ret
= osdmap
.get_pg_pool(poolid
)->is_unmanaged_snaps_mode();
660 int librados::RadosClient::get_fs_stats(ceph_statfs
& stats
)
662 ceph::mutex mylock
= ceph::make_mutex("RadosClient::get_fs_stats::mylock");
663 ceph::condition_variable cond
;
667 std::lock_guard l
{mylock
};
668 objecter
->get_fs_stats(stats
, boost::optional
<int64_t> (),
669 new C_SafeCond(mylock
, cond
, &done
, &ret
));
672 std::unique_lock l
{mylock
};
673 cond
.wait(l
, [&done
] { return done
;});
678 void librados::RadosClient::get() {
679 std::lock_guard
l(lock
);
680 ceph_assert(refcnt
> 0);
684 bool librados::RadosClient::put() {
685 std::lock_guard
l(lock
);
686 ceph_assert(refcnt
> 0);
688 return (refcnt
== 0);
691 int librados::RadosClient::pool_create(string
& name
,
697 int r
= wait_for_osdmap();
702 ceph::mutex mylock
= ceph::make_mutex("RadosClient::pool_create::mylock");
704 ceph::condition_variable cond
;
706 Context
*onfinish
= new C_SafeCond(mylock
, cond
, &done
, &reply
);
707 reply
= objecter
->create_pool(name
, onfinish
, crush_rule
);
712 std::unique_lock l
{mylock
};
713 cond
.wait(l
, [&done
] { return done
; });
718 int librados::RadosClient::pool_create_async(string
& name
,
719 PoolAsyncCompletionImpl
*c
,
722 int r
= wait_for_osdmap();
726 Context
*onfinish
= new C_PoolAsync_Safe(c
);
727 r
= objecter
->create_pool(name
, onfinish
, crush_rule
);
734 int librados::RadosClient::pool_get_base_tier(int64_t pool_id
, int64_t* base_tier
)
736 int r
= wait_for_osdmap();
741 objecter
->with_osdmap([&](const OSDMap
& o
) {
742 const pg_pool_t
* pool
= o
.get_pg_pool(pool_id
);
744 if (pool
->tier_of
< 0) {
745 *base_tier
= pool_id
;
747 *base_tier
= pool
->tier_of
;
757 int librados::RadosClient::pool_delete(const char *name
)
759 int r
= wait_for_osdmap();
764 ceph::mutex mylock
= ceph::make_mutex("RadosClient::pool_delete::mylock");
765 ceph::condition_variable cond
;
768 Context
*onfinish
= new C_SafeCond(mylock
, cond
, &done
, &ret
);
769 ret
= objecter
->delete_pool(name
, onfinish
);
774 std::unique_lock l
{mylock
};
775 cond
.wait(l
, [&done
] { return done
;});
780 int librados::RadosClient::pool_delete_async(const char *name
, PoolAsyncCompletionImpl
*c
)
782 int r
= wait_for_osdmap();
786 Context
*onfinish
= new C_PoolAsync_Safe(c
);
787 r
= objecter
->delete_pool(name
, onfinish
);
794 void librados::RadosClient::blacklist_self(bool set
) {
795 std::lock_guard
l(lock
);
796 objecter
->blacklist_self(set
);
799 std::string
librados::RadosClient::get_addrs() const {
800 CachedStackStringStream cos
;
801 *cos
<< messenger
->get_myaddrs();
802 return std::string(cos
->strv());
805 int librados::RadosClient::blacklist_add(const string
& client_address
,
806 uint32_t expire_seconds
)
809 if (!addr
.parse(client_address
.c_str(), 0)) {
810 lderr(cct
) << "unable to parse address " << client_address
<< dendl
;
814 std::stringstream cmd
;
816 << "\"prefix\": \"osd blacklist\", "
817 << "\"blacklistop\": \"add\", "
818 << "\"addr\": \"" << client_address
<< "\"";
819 if (expire_seconds
!= 0) {
820 cmd
<< ", \"expire\": " << expire_seconds
<< ".0";
824 std::vector
<std::string
> cmds
;
825 cmds
.push_back(cmd
.str());
827 int r
= mon_command(cmds
, inbl
, NULL
, NULL
);
832 // ensure we have the latest osd map epoch before proceeding
833 r
= wait_for_latest_osdmap();
837 int librados::RadosClient::mon_command(const vector
<string
>& cmd
,
838 const bufferlist
&inbl
,
839 bufferlist
*outbl
, string
*outs
)
842 mon_command_async(cmd
, inbl
, outbl
, outs
, &ctx
);
846 void librados::RadosClient::mon_command_async(const vector
<string
>& cmd
,
847 const bufferlist
&inbl
,
848 bufferlist
*outbl
, string
*outs
,
851 std::lock_guard l
{lock
};
852 monclient
.start_mon_command(cmd
, inbl
, outbl
, outs
, on_finish
);
855 int librados::RadosClient::mgr_command(const vector
<string
>& cmd
,
856 const bufferlist
&inbl
,
857 bufferlist
*outbl
, string
*outs
)
859 std::lock_guard
l(lock
);
862 int r
= mgrclient
.start_command(cmd
, inbl
, outbl
, outs
, &cond
);
867 if (conf
->rados_mon_op_timeout
) {
868 r
= cond
.wait_for(conf
->rados_mon_op_timeout
);
877 int librados::RadosClient::mgr_command(
879 const vector
<string
>& cmd
,
880 const bufferlist
&inbl
,
881 bufferlist
*outbl
, string
*outs
)
883 std::lock_guard
l(lock
);
886 int r
= mgrclient
.start_tell_command(name
, cmd
, inbl
, outbl
, outs
, &cond
);
891 if (conf
->rados_mon_op_timeout
) {
892 r
= cond
.wait_for(conf
->rados_mon_op_timeout
);
902 int librados::RadosClient::mon_command(int rank
, const vector
<string
>& cmd
,
903 const bufferlist
&inbl
,
904 bufferlist
*outbl
, string
*outs
)
906 ceph::mutex mylock
= ceph::make_mutex("RadosClient::mon_command::mylock");
907 ceph::condition_variable cond
;
911 std::lock_guard l
{mylock
};
912 monclient
.start_mon_command(rank
, cmd
, inbl
, outbl
, outs
,
913 new C_SafeCond(mylock
, cond
, &done
, &rval
));
915 std::unique_lock l
{mylock
};
916 cond
.wait(l
, [&done
] { return done
;});
920 int librados::RadosClient::mon_command(string name
, const vector
<string
>& cmd
,
921 const bufferlist
&inbl
,
922 bufferlist
*outbl
, string
*outs
)
924 ceph::mutex mylock
= ceph::make_mutex("RadosClient::mon_command::mylock");
925 ceph::condition_variable cond
;
929 std::lock_guard l
{mylock
};
930 monclient
.start_mon_command(name
, cmd
, inbl
, outbl
, outs
,
931 new C_SafeCond(mylock
, cond
, &done
, &rval
));
933 std::unique_lock l
{mylock
};
934 cond
.wait(l
, [&done
] { return done
;});
938 int librados::RadosClient::osd_command(int osd
, vector
<string
>& cmd
,
939 const bufferlist
& inbl
,
940 bufferlist
*poutbl
, string
*prs
)
942 ceph::mutex mylock
= ceph::make_mutex("RadosClient::osd_command::mylock");
943 ceph::condition_variable cond
;
952 std::lock_guard l
{mylock
};
953 // XXX do anything with tid?
954 objecter
->osd_command(osd
, cmd
, inbl
, &tid
, poutbl
, prs
,
955 new C_SafeCond(mylock
, cond
, &done
, &ret
));
957 std::unique_lock l
{mylock
};
958 cond
.wait(l
, [&done
] { return done
;});
962 int librados::RadosClient::pg_command(pg_t pgid
, vector
<string
>& cmd
,
963 const bufferlist
& inbl
,
964 bufferlist
*poutbl
, string
*prs
)
966 ceph::mutex mylock
= ceph::make_mutex("RadosClient::pg_command::mylock");
967 ceph::condition_variable cond
;
973 std::lock_guard l
{lock
};
974 objecter
->pg_command(pgid
, cmd
, inbl
, &tid
, poutbl
, prs
,
975 new C_SafeCond(mylock
, cond
, &done
, &ret
));
977 std::unique_lock l
{mylock
};
978 cond
.wait(l
, [&done
] { return done
;});
982 int librados::RadosClient::monitor_log(const string
& level
,
983 rados_log_callback_t cb
,
984 rados_log_callback2_t cb2
,
987 std::lock_guard
l(lock
);
989 if (state
!= CONNECTED
) {
993 if (cb
== NULL
&& cb2
== NULL
) {
995 ldout(cct
, 10) << __func__
<< " removing cb " << (void*)log_cb
996 << " " << (void*)log_cb2
<< dendl
;
997 monclient
.sub_unwant(log_watch
);
1006 if (level
== "debug") {
1007 watch_level
= "log-debug";
1008 } else if (level
== "info") {
1009 watch_level
= "log-info";
1010 } else if (level
== "warn" || level
== "warning") {
1011 watch_level
= "log-warn";
1012 } else if (level
== "err" || level
== "error") {
1013 watch_level
= "log-error";
1014 } else if (level
== "sec") {
1015 watch_level
= "log-sec";
1017 ldout(cct
, 10) << __func__
<< " invalid level " << level
<< dendl
;
1021 if (log_cb
|| log_cb2
)
1022 monclient
.sub_unwant(log_watch
);
1025 ldout(cct
, 10) << __func__
<< " add cb " << (void*)cb
<< " " << (void*)cb2
1026 << " level " << level
<< dendl
;
1027 monclient
.sub_want(watch_level
, 0, 0);
1029 monclient
.renew_subs();
1033 log_watch
= watch_level
;
1037 void librados::RadosClient::handle_log(MLog
*m
)
1039 ceph_assert(ceph_mutex_is_locked(lock
));
1040 ldout(cct
, 10) << __func__
<< " version " << m
->version
<< dendl
;
1042 if (log_last_version
< m
->version
) {
1043 log_last_version
= m
->version
;
1045 if (log_cb
|| log_cb2
) {
1046 for (std::deque
<LogEntry
>::iterator it
= m
->entries
.begin(); it
!= m
->entries
.end(); ++it
) {
1049 ss
<< e
.stamp
<< " " << e
.name
<< " " << e
.prio
<< " " << e
.msg
;
1050 string line
= ss
.str();
1051 string who
= stringify(e
.rank
) + " " + stringify(e
.addrs
);
1052 string name
= stringify(e
.name
);
1053 string level
= stringify(e
.prio
);
1054 struct timespec stamp
;
1055 e
.stamp
.to_timespec(&stamp
);
1057 ldout(cct
, 20) << __func__
<< " delivering " << ss
.str() << dendl
;
1059 log_cb(log_cb_arg
, line
.c_str(), who
.c_str(),
1060 stamp
.tv_sec
, stamp
.tv_nsec
,
1061 e
.seq
, level
.c_str(), e
.msg
.c_str());
1063 log_cb2(log_cb_arg
, line
.c_str(),
1065 who
.c_str(), name
.c_str(),
1066 stamp
.tv_sec
, stamp
.tv_nsec
,
1067 e
.seq
, level
.c_str(), e
.msg
.c_str());
1071 monclient
.sub_got(log_watch
, log_last_version
);
1077 int librados::RadosClient::service_daemon_register(
1078 const std::string
& service
, ///< service name (e.g., 'rgw')
1079 const std::string
& name
, ///< daemon name (e.g., 'gwfoo')
1080 const std::map
<std::string
,std::string
>& metadata
)
1082 if (service_daemon
) {
1085 if (service
== "osd" ||
1087 service
== "client" ||
1090 // normal ceph entity types are not allowed!
1093 if (service
.empty() || name
.empty()) {
1097 collect_sys_info(&daemon_metadata
, cct
);
1099 ldout(cct
,10) << __func__
<< " " << service
<< "." << name
<< dendl
;
1100 service_daemon
= true;
1101 service_name
= service
;
1103 daemon_metadata
.insert(metadata
.begin(), metadata
.end());
1105 if (state
== DISCONNECTED
) {
1108 if (state
== CONNECTING
) {
1111 mgrclient
.service_daemon_register(service_name
, daemon_name
,
1116 int librados::RadosClient::service_daemon_update_status(
1117 std::map
<std::string
,std::string
>&& status
)
1119 if (state
!= CONNECTED
) {
1122 return mgrclient
.service_daemon_update_status(std::move(status
));
1125 mon_feature_t
librados::RadosClient::get_required_monitor_features() const
1127 return monclient
.with_monmap([](const MonMap
&monmap
) {
1128 return monmap
.get_required_features(); } );
1131 int librados::RadosClient::get_inconsistent_pgs(int64_t pool_id
,
1132 std::vector
<std::string
>* pgs
)
1134 vector
<string
> cmd
= {
1135 "{\"prefix\": \"pg ls\","
1136 "\"pool\": " + std::to_string(pool_id
) + ","
1137 "\"states\": [\"inconsistent\"],"
1138 "\"format\": \"json\"}"
1140 bufferlist inbl
, outbl
;
1142 if (auto ret
= mgr_command(cmd
, inbl
, &outbl
, &outstring
); ret
) {
1145 if (!outbl
.length()) {
1150 if (!parser
.parse(outbl
.c_str(), outbl
.length())) {
1154 if (!parser
.is_array()) {
1155 JSONObj
*pgstat_obj
= parser
.find_obj("pg_stats");
1158 auto s
= pgstat_obj
->get_data();
1159 JSONParser pg_stats
;
1160 if (!pg_stats
.parse(s
.c_str(), s
.length())) {
1163 v
= pg_stats
.get_array_elements();
1165 v
= parser
.get_array_elements();
1169 if (!pg_json
.parse(i
.c_str(), i
.length())) {
1173 JSONDecoder::decode_json("pgid", pgid
, &pg_json
);
1174 pgs
->emplace_back(std::move(pgid
));