1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <sys/types.h>
25 #include "common/ceph_context.h"
26 #include "common/config.h"
27 #include "common/common_init.h"
28 #include "common/ceph_json.h"
29 #include "common/errno.h"
30 #include "common/ceph_json.h"
31 #include "include/buffer.h"
32 #include "include/stringify.h"
33 #include "include/util.h"
35 #include "messages/MLog.h"
36 #include "msg/Messenger.h"
38 // needed for static_cast
39 #include "messages/PaxosServiceMessage.h"
40 #include "messages/MPoolOpReply.h"
41 #include "messages/MStatfsReply.h"
42 #include "messages/MGetPoolStatsReply.h"
43 #include "messages/MOSDOpReply.h"
44 #include "messages/MOSDMap.h"
45 #include "messages/MCommandReply.h"
47 #include "AioCompletionImpl.h"
48 #include "IoCtxImpl.h"
49 #include "PoolAsyncCompletionImpl.h"
50 #include "RadosClient.h"
52 #include "include/ceph_assert.h"
53 #include "common/EventTrace.h"
55 #define dout_subsys ceph_subsys_rados
57 #define dout_prefix *_dout << "librados: "
59 librados::RadosClient::RadosClient(CephContext
*cct_
)
60 : Dispatcher(cct_
->get()),
61 cct_deleter
{cct_
, [](CephContext
*p
) {p
->put();}},
65 mgrclient(cct_
, nullptr, &monclient
.monmap
),
71 log_last_version(0), log_cb(NULL
), log_cb2(NULL
), log_cb_arg(NULL
),
72 finisher(cct
, "radosclient", "fn-radosclient")
74 conf
.add_observer(this);
77 int64_t librados::RadosClient::lookup_pool(const char *name
)
79 int r
= wait_for_osdmap();
84 int64_t ret
= objecter
->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name
),
87 // Make sure we have the latest map
88 int r
= wait_for_latest_osdmap();
91 ret
= objecter
->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name
),
98 bool librados::RadosClient::pool_requires_alignment(int64_t pool_id
)
101 int r
= pool_requires_alignment2(pool_id
, &requires
);
103 // Cast answer to false, this is a little bit problematic
104 // since we really don't know the answer yet, say.
111 // a safer version of pool_requires_alignment
112 int librados::RadosClient::pool_requires_alignment2(int64_t pool_id
,
118 int r
= wait_for_osdmap();
123 return objecter
->with_osdmap([requires
, pool_id
](const OSDMap
& o
) {
124 if (!o
.have_pg_pool(pool_id
)) {
127 *requires
= o
.get_pg_pool(pool_id
)->requires_aligned_append();
132 uint64_t librados::RadosClient::pool_required_alignment(int64_t pool_id
)
135 int r
= pool_required_alignment2(pool_id
, &alignment
);
143 // a safer version of pool_required_alignment
144 int librados::RadosClient::pool_required_alignment2(int64_t pool_id
,
150 int r
= wait_for_osdmap();
155 return objecter
->with_osdmap([alignment
, pool_id
](const OSDMap
&o
) {
156 if (!o
.have_pg_pool(pool_id
)) {
159 *alignment
= o
.get_pg_pool(pool_id
)->required_alignment();
164 int librados::RadosClient::pool_get_name(uint64_t pool_id
, std::string
*s
, bool wait_latest_map
)
166 int r
= wait_for_osdmap();
170 objecter
->with_osdmap([&](const OSDMap
& o
) {
171 if (!o
.have_pg_pool(pool_id
)) {
175 *s
= o
.get_pool_name(pool_id
);
178 if (r
== -ENOENT
&& wait_latest_map
) {
179 r
= wait_for_latest_osdmap();
182 wait_latest_map
= false;
189 int librados::RadosClient::get_fsid(std::string
*s
)
193 std::lock_guard
l(lock
);
195 oss
<< monclient
.get_fsid();
200 int librados::RadosClient::ping_monitor(const string mon_id
, string
*result
)
203 /* If we haven't yet connected, we have no way of telling whether we
204 * already built monc's initial monmap. IF we are in CONNECTED state,
205 * then it is safe to assume that we went through connect(), which does
208 if (state
!= CONNECTED
) {
209 ldout(cct
, 10) << __func__
<< " build monmap" << dendl
;
210 err
= monclient
.build_initial_monmap();
216 err
= monclient
.ping_monitor(mon_id
, result
);
220 int librados::RadosClient::connect()
224 // already connected?
225 if (state
== CONNECTING
)
227 if (state
== CONNECTED
)
231 if (cct
->_conf
->log_early
&&
232 !cct
->_log
->is_started()) {
237 MonClient
mc_bootstrap(cct
);
238 err
= mc_bootstrap
.get_monmap_and_config();
243 common_init_finish(cct
);
246 err
= monclient
.build_initial_monmap();
251 messenger
= Messenger::create_client_messenger(cct
, "radosclient");
255 // require OSDREPLYMUX feature. this means we will fail to talk to
256 // old servers. this is necessary because otherwise we won't know
257 // how to decompose the reply data into its constituent pieces.
258 messenger
->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX
));
260 ldout(cct
, 1) << "starting msgr at " << messenger
->get_myaddrs() << dendl
;
262 ldout(cct
, 1) << "starting objecter" << dendl
;
264 objecter
= new (std::nothrow
) Objecter(cct
, messenger
, &monclient
, &finisher
);
267 objecter
->set_balanced_budget();
269 monclient
.set_messenger(messenger
);
270 mgrclient
.set_messenger(messenger
);
273 messenger
->add_dispatcher_head(&mgrclient
);
274 messenger
->add_dispatcher_tail(objecter
);
275 messenger
->add_dispatcher_tail(this);
279 ldout(cct
, 1) << "setting wanted keys" << dendl
;
280 monclient
.set_want_keys(
281 CEPH_ENTITY_TYPE_MON
| CEPH_ENTITY_TYPE_OSD
| CEPH_ENTITY_TYPE_MGR
);
282 ldout(cct
, 1) << "calling monclient init" << dendl
;
283 err
= monclient
.init();
285 ldout(cct
, 0) << conf
->name
<< " initialization error " << cpp_strerror(-err
) << dendl
;
290 err
= monclient
.authenticate(conf
->client_mount_timeout
);
292 ldout(cct
, 0) << conf
->name
<< " authentication error " << cpp_strerror(-err
) << dendl
;
296 messenger
->set_myname(entity_name_t::CLIENT(monclient
.get_global_id()));
298 // Detect older cluster, put mgrclient into compatible mode
299 mgrclient
.set_mgr_optional(
300 !get_required_monitor_features().contains_all(
301 ceph::features::mon::FEATURE_LUMINOUS
));
303 // MgrClient needs this (it doesn't have MonClient reference itself)
304 monclient
.sub_want("mgrmap", 0, 0);
305 monclient
.renew_subs();
307 if (service_daemon
) {
308 ldout(cct
, 10) << __func__
<< " registering as " << service_name
<< "."
309 << daemon_name
<< dendl
;
310 mgrclient
.service_daemon_register(service_name
, daemon_name
,
315 objecter
->set_client_incarnation(0);
324 instance_id
= monclient
.get_global_id();
328 ldout(cct
, 1) << "init done" << dendl
;
333 state
= DISCONNECTED
;
348 void librados::RadosClient::shutdown()
350 std::unique_lock l
{lock
};
351 if (state
== DISCONNECTED
) {
355 bool need_objecter
= false;
356 if (objecter
&& objecter
->initialized
) {
357 need_objecter
= true;
360 if (state
== CONNECTED
) {
362 // make sure watch callbacks are flushed
365 finisher
.wait_for_empty();
368 state
= DISCONNECTED
;
370 timer
.shutdown(); // will drop+retake lock
373 objecter
->shutdown();
375 mgrclient
.shutdown();
377 monclient
.shutdown();
379 messenger
->shutdown();
382 ldout(cct
, 1) << "shutdown" << dendl
;
385 int librados::RadosClient::watch_flush()
387 ldout(cct
, 10) << __func__
<< " enter" << dendl
;
388 ceph::mutex mylock
= ceph::make_mutex("RadosClient::watch_flush::mylock");
389 ceph::condition_variable cond
;
391 objecter
->linger_callback_flush(new C_SafeCond(mylock
, cond
, &done
));
393 std::unique_lock l
{mylock
};
394 cond
.wait(l
, [&done
] { return done
; });
395 ldout(cct
, 10) << __func__
<< " exit" << dendl
;
399 struct C_aio_watch_flush_Complete
: public Context
{
400 librados::RadosClient
*client
;
401 librados::AioCompletionImpl
*c
;
403 C_aio_watch_flush_Complete(librados::RadosClient
*_client
, librados::AioCompletionImpl
*_c
)
404 : client(_client
), c(_c
) {
408 void finish(int r
) override
{
412 c
->cond
.notify_all();
414 if (c
->callback_complete
||
416 client
->finisher
.queue(new librados::C_AioComplete(c
));
422 int librados::RadosClient::async_watch_flush(AioCompletionImpl
*c
)
424 ldout(cct
, 10) << __func__
<< " enter" << dendl
;
425 Context
*oncomplete
= new C_aio_watch_flush_Complete(this, c
);
426 objecter
->linger_callback_flush(oncomplete
);
427 ldout(cct
, 10) << __func__
<< " exit" << dendl
;
431 uint64_t librados::RadosClient::get_instance_id()
436 int librados::RadosClient::get_min_compatible_osd(int8_t* require_osd_release
)
438 int r
= wait_for_osdmap();
443 objecter
->with_osdmap(
444 [require_osd_release
](const OSDMap
& o
) {
445 *require_osd_release
= ceph::to_integer
<int8_t>(o
.require_osd_release
);
450 int librados::RadosClient::get_min_compatible_client(int8_t* min_compat_client
,
451 int8_t* require_min_compat_client
)
453 int r
= wait_for_osdmap();
458 objecter
->with_osdmap(
459 [min_compat_client
, require_min_compat_client
](const OSDMap
& o
) {
461 ceph::to_integer
<int8_t>(o
.get_min_compat_client());
462 *require_min_compat_client
=
463 ceph::to_integer
<int8_t>(o
.get_require_min_compat_client());
468 librados::RadosClient::~RadosClient()
470 conf
.remove_observer(this);
478 int librados::RadosClient::create_ioctx(const char *name
, IoCtxImpl
**io
)
480 int64_t poolid
= lookup_pool(name
);
485 *io
= new librados::IoCtxImpl(this, objecter
, poolid
, CEPH_NOSNAP
);
489 int librados::RadosClient::create_ioctx(int64_t pool_id
, IoCtxImpl
**io
)
491 std::string pool_name
;
492 int r
= pool_get_name(pool_id
, &pool_name
, true);
495 *io
= new librados::IoCtxImpl(this, objecter
, pool_id
, CEPH_NOSNAP
);
499 bool librados::RadosClient::ms_dispatch(Message
*m
)
503 std::lock_guard
l(lock
);
504 if (state
== DISCONNECTED
) {
505 ldout(cct
, 10) << "disconnected, discarding " << *m
<< dendl
;
514 void librados::RadosClient::ms_handle_connect(Connection
*con
)
518 bool librados::RadosClient::ms_handle_reset(Connection
*con
)
523 void librados::RadosClient::ms_handle_remote_reset(Connection
*con
)
527 bool librados::RadosClient::ms_handle_refused(Connection
*con
)
532 bool librados::RadosClient::_dispatch(Message
*m
)
534 ceph_assert(ceph_mutex_is_locked(lock
));
535 switch (m
->get_type()) {
537 case CEPH_MSG_OSD_MAP
:
542 case CEPH_MSG_MDS_MAP
:
547 handle_log(static_cast<MLog
*>(m
));
558 int librados::RadosClient::wait_for_osdmap()
560 ceph_assert(ceph_mutex_is_not_locked_by_me(lock
));
562 if (state
!= CONNECTED
) {
566 bool need_map
= false;
567 objecter
->with_osdmap([&](const OSDMap
& o
) {
568 if (o
.get_epoch() == 0) {
574 std::unique_lock
l(lock
);
576 ceph::timespan timeout
= rados_mon_op_timeout
;
577 if (objecter
->with_osdmap(std::mem_fn(&OSDMap::get_epoch
)) == 0) {
578 ldout(cct
, 10) << __func__
<< " waiting" << dendl
;
579 while (objecter
->with_osdmap(std::mem_fn(&OSDMap::get_epoch
)) == 0) {
580 if (timeout
== timeout
.zero()) {
583 if (cond
.wait_for(l
, timeout
) == std::cv_status::timeout
) {
584 lderr(cct
) << "timed out waiting for first osdmap from monitors"
590 ldout(cct
, 10) << __func__
<< " done waiting" << dendl
;
599 int librados::RadosClient::wait_for_latest_osdmap()
601 ceph::mutex mylock
= ceph::make_mutex("RadosClient::wait_for_latest_osdmap");
602 ceph::condition_variable cond
;
605 objecter
->wait_for_latest_osdmap(new C_SafeCond(mylock
, cond
, &done
));
607 std::unique_lock l
{mylock
};
608 cond
.wait(l
, [&done
] {return done
;});
612 int librados::RadosClient::pool_list(std::list
<std::pair
<int64_t, string
> >& v
)
614 int r
= wait_for_osdmap();
618 objecter
->with_osdmap([&](const OSDMap
& o
) {
619 for (auto p
: o
.get_pools())
620 v
.push_back(std::make_pair(p
.first
, o
.get_pool_name(p
.first
)));
625 int librados::RadosClient::get_pool_stats(std::list
<string
>& pools
,
626 map
<string
,::pool_stat_t
> *result
,
629 ceph::mutex mylock
= ceph::make_mutex("RadosClient::get_pool_stats::mylock");
630 ceph::condition_variable cond
;
634 objecter
->get_pool_stats(pools
, result
, per_pool
,
635 new C_SafeCond(mylock
, cond
, &done
,
638 unique_lock l
{mylock
};
639 cond
.wait(l
, [&done
] { return done
;});
643 bool librados::RadosClient::get_pool_is_selfmanaged_snaps_mode(
644 const std::string
& pool
)
647 objecter
->with_osdmap([&](const OSDMap
& osdmap
) {
648 int64_t poolid
= osdmap
.lookup_pg_pool_name(pool
);
650 ret
= osdmap
.get_pg_pool(poolid
)->is_unmanaged_snaps_mode();
655 int librados::RadosClient::get_fs_stats(ceph_statfs
& stats
)
657 ceph::mutex mylock
= ceph::make_mutex("RadosClient::get_fs_stats::mylock");
658 ceph::condition_variable cond
;
662 std::lock_guard l
{mylock
};
663 objecter
->get_fs_stats(stats
, boost::optional
<int64_t> (),
664 new C_SafeCond(mylock
, cond
, &done
, &ret
));
667 std::unique_lock l
{mylock
};
668 cond
.wait(l
, [&done
] { return done
;});
673 void librados::RadosClient::get() {
674 std::lock_guard
l(lock
);
675 ceph_assert(refcnt
> 0);
679 bool librados::RadosClient::put() {
680 std::lock_guard
l(lock
);
681 ceph_assert(refcnt
> 0);
683 return (refcnt
== 0);
686 int librados::RadosClient::pool_create(string
& name
,
692 int r
= wait_for_osdmap();
697 ceph::mutex mylock
= ceph::make_mutex("RadosClient::pool_create::mylock");
699 ceph::condition_variable cond
;
701 Context
*onfinish
= new C_SafeCond(mylock
, cond
, &done
, &reply
);
702 reply
= objecter
->create_pool(name
, onfinish
, crush_rule
);
707 std::unique_lock l
{mylock
};
708 cond
.wait(l
, [&done
] { return done
; });
713 int librados::RadosClient::pool_create_async(string
& name
,
714 PoolAsyncCompletionImpl
*c
,
717 int r
= wait_for_osdmap();
721 Context
*onfinish
= new C_PoolAsync_Safe(c
);
722 r
= objecter
->create_pool(name
, onfinish
, crush_rule
);
729 int librados::RadosClient::pool_get_base_tier(int64_t pool_id
, int64_t* base_tier
)
731 int r
= wait_for_osdmap();
736 objecter
->with_osdmap([&](const OSDMap
& o
) {
737 const pg_pool_t
* pool
= o
.get_pg_pool(pool_id
);
739 if (pool
->tier_of
< 0) {
740 *base_tier
= pool_id
;
742 *base_tier
= pool
->tier_of
;
752 int librados::RadosClient::pool_delete(const char *name
)
754 int r
= wait_for_osdmap();
759 ceph::mutex mylock
= ceph::make_mutex("RadosClient::pool_delete::mylock");
760 ceph::condition_variable cond
;
763 Context
*onfinish
= new C_SafeCond(mylock
, cond
, &done
, &ret
);
764 ret
= objecter
->delete_pool(name
, onfinish
);
769 std::unique_lock l
{mylock
};
770 cond
.wait(l
, [&done
] { return done
;});
775 int librados::RadosClient::pool_delete_async(const char *name
, PoolAsyncCompletionImpl
*c
)
777 int r
= wait_for_osdmap();
781 Context
*onfinish
= new C_PoolAsync_Safe(c
);
782 r
= objecter
->delete_pool(name
, onfinish
);
789 void librados::RadosClient::blacklist_self(bool set
) {
790 std::lock_guard
l(lock
);
791 objecter
->blacklist_self(set
);
794 std::string
librados::RadosClient::get_addrs() const {
795 CachedStackStringStream cos
;
796 *cos
<< messenger
->get_myaddrs();
797 return std::string(cos
->strv());
800 int librados::RadosClient::blacklist_add(const string
& client_address
,
801 uint32_t expire_seconds
)
804 if (!addr
.parse(client_address
.c_str(), 0)) {
805 lderr(cct
) << "unable to parse address " << client_address
<< dendl
;
809 std::stringstream cmd
;
811 << "\"prefix\": \"osd blacklist\", "
812 << "\"blacklistop\": \"add\", "
813 << "\"addr\": \"" << client_address
<< "\"";
814 if (expire_seconds
!= 0) {
815 cmd
<< ", \"expire\": " << expire_seconds
<< ".0";
819 std::vector
<std::string
> cmds
;
820 cmds
.push_back(cmd
.str());
822 int r
= mon_command(cmds
, inbl
, NULL
, NULL
);
827 // ensure we have the latest osd map epoch before proceeding
828 r
= wait_for_latest_osdmap();
832 int librados::RadosClient::mon_command(const vector
<string
>& cmd
,
833 const bufferlist
&inbl
,
834 bufferlist
*outbl
, string
*outs
)
837 mon_command_async(cmd
, inbl
, outbl
, outs
, &ctx
);
841 void librados::RadosClient::mon_command_async(const vector
<string
>& cmd
,
842 const bufferlist
&inbl
,
843 bufferlist
*outbl
, string
*outs
,
846 std::lock_guard l
{lock
};
847 monclient
.start_mon_command(cmd
, inbl
, outbl
, outs
, on_finish
);
850 int librados::RadosClient::mgr_command(const vector
<string
>& cmd
,
851 const bufferlist
&inbl
,
852 bufferlist
*outbl
, string
*outs
)
854 std::lock_guard
l(lock
);
857 int r
= mgrclient
.start_command(cmd
, inbl
, outbl
, outs
, &cond
);
862 if (rados_mon_op_timeout
.count() > 0) {
863 r
= cond
.wait_for(rados_mon_op_timeout
);
872 int librados::RadosClient::mgr_command(
874 const vector
<string
>& cmd
,
875 const bufferlist
&inbl
,
876 bufferlist
*outbl
, string
*outs
)
878 std::lock_guard
l(lock
);
881 int r
= mgrclient
.start_tell_command(name
, cmd
, inbl
, outbl
, outs
, &cond
);
886 if (rados_mon_op_timeout
.count() > 0) {
887 r
= cond
.wait_for(rados_mon_op_timeout
);
897 int librados::RadosClient::mon_command(int rank
, const vector
<string
>& cmd
,
898 const bufferlist
&inbl
,
899 bufferlist
*outbl
, string
*outs
)
901 ceph::mutex mylock
= ceph::make_mutex("RadosClient::mon_command::mylock");
902 ceph::condition_variable cond
;
906 std::lock_guard l
{mylock
};
907 monclient
.start_mon_command(rank
, cmd
, inbl
, outbl
, outs
,
908 new C_SafeCond(mylock
, cond
, &done
, &rval
));
910 std::unique_lock l
{mylock
};
911 cond
.wait(l
, [&done
] { return done
;});
915 int librados::RadosClient::mon_command(string name
, const vector
<string
>& cmd
,
916 const bufferlist
&inbl
,
917 bufferlist
*outbl
, string
*outs
)
919 ceph::mutex mylock
= ceph::make_mutex("RadosClient::mon_command::mylock");
920 ceph::condition_variable cond
;
924 std::lock_guard l
{mylock
};
925 monclient
.start_mon_command(name
, cmd
, inbl
, outbl
, outs
,
926 new C_SafeCond(mylock
, cond
, &done
, &rval
));
928 std::unique_lock l
{mylock
};
929 cond
.wait(l
, [&done
] { return done
;});
933 int librados::RadosClient::osd_command(int osd
, vector
<string
>& cmd
,
934 const bufferlist
& inbl
,
935 bufferlist
*poutbl
, string
*prs
)
937 ceph::mutex mylock
= ceph::make_mutex("RadosClient::osd_command::mylock");
938 ceph::condition_variable cond
;
947 std::lock_guard l
{mylock
};
948 // XXX do anything with tid?
949 objecter
->osd_command(osd
, cmd
, inbl
, &tid
, poutbl
, prs
,
950 new C_SafeCond(mylock
, cond
, &done
, &ret
));
952 std::unique_lock l
{mylock
};
953 cond
.wait(l
, [&done
] { return done
;});
957 int librados::RadosClient::pg_command(pg_t pgid
, vector
<string
>& cmd
,
958 const bufferlist
& inbl
,
959 bufferlist
*poutbl
, string
*prs
)
961 ceph::mutex mylock
= ceph::make_mutex("RadosClient::pg_command::mylock");
962 ceph::condition_variable cond
;
968 std::lock_guard l
{lock
};
969 objecter
->pg_command(pgid
, cmd
, inbl
, &tid
, poutbl
, prs
,
970 new C_SafeCond(mylock
, cond
, &done
, &ret
));
972 std::unique_lock l
{mylock
};
973 cond
.wait(l
, [&done
] { return done
;});
977 int librados::RadosClient::monitor_log(const string
& level
,
978 rados_log_callback_t cb
,
979 rados_log_callback2_t cb2
,
982 std::lock_guard
l(lock
);
984 if (state
!= CONNECTED
) {
988 if (cb
== NULL
&& cb2
== NULL
) {
990 ldout(cct
, 10) << __func__
<< " removing cb " << (void*)log_cb
991 << " " << (void*)log_cb2
<< dendl
;
992 monclient
.sub_unwant(log_watch
);
1001 if (level
== "debug") {
1002 watch_level
= "log-debug";
1003 } else if (level
== "info") {
1004 watch_level
= "log-info";
1005 } else if (level
== "warn" || level
== "warning") {
1006 watch_level
= "log-warn";
1007 } else if (level
== "err" || level
== "error") {
1008 watch_level
= "log-error";
1009 } else if (level
== "sec") {
1010 watch_level
= "log-sec";
1012 ldout(cct
, 10) << __func__
<< " invalid level " << level
<< dendl
;
1016 if (log_cb
|| log_cb2
)
1017 monclient
.sub_unwant(log_watch
);
1020 ldout(cct
, 10) << __func__
<< " add cb " << (void*)cb
<< " " << (void*)cb2
1021 << " level " << level
<< dendl
;
1022 monclient
.sub_want(watch_level
, 0, 0);
1024 monclient
.renew_subs();
1028 log_watch
= watch_level
;
1032 void librados::RadosClient::handle_log(MLog
*m
)
1034 ceph_assert(ceph_mutex_is_locked(lock
));
1035 ldout(cct
, 10) << __func__
<< " version " << m
->version
<< dendl
;
1037 if (log_last_version
< m
->version
) {
1038 log_last_version
= m
->version
;
1040 if (log_cb
|| log_cb2
) {
1041 for (std::deque
<LogEntry
>::iterator it
= m
->entries
.begin(); it
!= m
->entries
.end(); ++it
) {
1044 ss
<< e
.stamp
<< " " << e
.name
<< " " << e
.prio
<< " " << e
.msg
;
1045 string line
= ss
.str();
1046 string who
= stringify(e
.rank
) + " " + stringify(e
.addrs
);
1047 string name
= stringify(e
.name
);
1048 string level
= stringify(e
.prio
);
1049 struct timespec stamp
;
1050 e
.stamp
.to_timespec(&stamp
);
1052 ldout(cct
, 20) << __func__
<< " delivering " << ss
.str() << dendl
;
1054 log_cb(log_cb_arg
, line
.c_str(), who
.c_str(),
1055 stamp
.tv_sec
, stamp
.tv_nsec
,
1056 e
.seq
, level
.c_str(), e
.msg
.c_str());
1058 log_cb2(log_cb_arg
, line
.c_str(),
1060 who
.c_str(), name
.c_str(),
1061 stamp
.tv_sec
, stamp
.tv_nsec
,
1062 e
.seq
, level
.c_str(), e
.msg
.c_str());
1066 monclient
.sub_got(log_watch
, log_last_version
);
1072 int librados::RadosClient::service_daemon_register(
1073 const std::string
& service
, ///< service name (e.g., 'rgw')
1074 const std::string
& name
, ///< daemon name (e.g., 'gwfoo')
1075 const std::map
<std::string
,std::string
>& metadata
)
1077 if (service_daemon
) {
1080 if (service
== "osd" ||
1082 service
== "client" ||
1085 // normal ceph entity types are not allowed!
1088 if (service
.empty() || name
.empty()) {
1092 collect_sys_info(&daemon_metadata
, cct
);
1094 ldout(cct
,10) << __func__
<< " " << service
<< "." << name
<< dendl
;
1095 service_daemon
= true;
1096 service_name
= service
;
1098 daemon_metadata
.insert(metadata
.begin(), metadata
.end());
1100 if (state
== DISCONNECTED
) {
1103 if (state
== CONNECTING
) {
1106 mgrclient
.service_daemon_register(service_name
, daemon_name
,
1111 int librados::RadosClient::service_daemon_update_status(
1112 std::map
<std::string
,std::string
>&& status
)
1114 if (state
!= CONNECTED
) {
1117 return mgrclient
.service_daemon_update_status(std::move(status
));
1120 mon_feature_t
librados::RadosClient::get_required_monitor_features() const
1122 return monclient
.with_monmap([](const MonMap
&monmap
) {
1123 return monmap
.get_required_features(); } );
1126 int librados::RadosClient::get_inconsistent_pgs(int64_t pool_id
,
1127 std::vector
<std::string
>* pgs
)
1129 vector
<string
> cmd
= {
1130 "{\"prefix\": \"pg ls\","
1131 "\"pool\": " + std::to_string(pool_id
) + ","
1132 "\"states\": [\"inconsistent\"],"
1133 "\"format\": \"json\"}"
1135 bufferlist inbl
, outbl
;
1137 if (auto ret
= mgr_command(cmd
, inbl
, &outbl
, &outstring
); ret
) {
1140 if (!outbl
.length()) {
1145 if (!parser
.parse(outbl
.c_str(), outbl
.length())) {
1149 if (!parser
.is_array()) {
1150 JSONObj
*pgstat_obj
= parser
.find_obj("pg_stats");
1153 auto s
= pgstat_obj
->get_data();
1154 JSONParser pg_stats
;
1155 if (!pg_stats
.parse(s
.c_str(), s
.length())) {
1158 v
= pg_stats
.get_array_elements();
1160 v
= parser
.get_array_elements();
1164 if (!pg_json
.parse(i
.c_str(), i
.length())) {
1168 JSONDecoder::decode_json("pgid", pgid
, &pg_json
);
1169 pgs
->emplace_back(std::move(pgid
));
1174 const char** librados::RadosClient::get_tracked_conf_keys() const
1176 static const char *config_keys
[] = {
1177 "rados_mon_op_timeout",
1183 void librados::RadosClient::handle_conf_change(const ConfigProxy
& conf
,
1184 const std::set
<std::string
> &changed
)
1186 if (changed
.count("rados_mon_op_timeout")) {
1187 rados_mon_op_timeout
= conf
.get_val
<std::chrono::seconds
>("rados_mon_op_timeout");