1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <sys/types.h>
25 #include "common/ceph_context.h"
26 #include "common/config.h"
27 #include "common/common_init.h"
28 #include "common/errno.h"
29 #include "include/buffer.h"
30 #include "include/stringify.h"
31 #include "include/util.h"
33 #include "messages/MLog.h"
34 #include "msg/Messenger.h"
36 // needed for static_cast
37 #include "messages/PaxosServiceMessage.h"
38 #include "messages/MPoolOpReply.h"
39 #include "messages/MStatfsReply.h"
40 #include "messages/MGetPoolStatsReply.h"
41 #include "messages/MOSDOpReply.h"
42 #include "messages/MOSDMap.h"
43 #include "messages/MCommandReply.h"
45 #include "AioCompletionImpl.h"
46 #include "IoCtxImpl.h"
47 #include "PoolAsyncCompletionImpl.h"
48 #include "RadosClient.h"
50 #include "include/assert.h"
51 #include "common/EventTrace.h"
53 #define dout_subsys ceph_subsys_rados
55 #define dout_prefix *_dout << "librados: "
57 bool librados::RadosClient::ms_get_authorizer(int dest_type
,
58 AuthAuthorizer
**authorizer
,
60 //ldout(cct, 0) << "RadosClient::ms_get_authorizer type=" << dest_type << dendl;
61 /* monitor authorization is being handled on different layer */
62 if (dest_type
== CEPH_ENTITY_TYPE_MON
)
64 *authorizer
= monclient
.build_authorizer(dest_type
);
65 return *authorizer
!= NULL
;
68 librados::RadosClient::RadosClient(CephContext
*cct_
)
69 : Dispatcher(cct_
->get()),
70 cct_deleter
{cct_
, [](CephContext
*p
) {p
->put();}},
74 mgrclient(cct_
, nullptr),
78 lock("librados::RadosClient::lock"),
81 log_last_version(0), log_cb(NULL
), log_cb2(NULL
), log_cb_arg(NULL
),
82 finisher(cct
, "radosclient", "fn-radosclient")
86 int64_t librados::RadosClient::lookup_pool(const char *name
)
88 int r
= wait_for_osdmap();
93 int64_t ret
= objecter
->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name
),
96 // Make sure we have the latest map
97 int r
= wait_for_latest_osdmap();
100 ret
= objecter
->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name
),
107 bool librados::RadosClient::pool_requires_alignment(int64_t pool_id
)
110 int r
= pool_requires_alignment2(pool_id
, &requires
);
112 // Cast answer to false, this is a little bit problematic
113 // since we really don't know the answer yet, say.
120 // a safer version of pool_requires_alignment
121 int librados::RadosClient::pool_requires_alignment2(int64_t pool_id
,
127 int r
= wait_for_osdmap();
132 return objecter
->with_osdmap([requires
, pool_id
](const OSDMap
& o
) {
133 if (!o
.have_pg_pool(pool_id
)) {
136 *requires
= o
.get_pg_pool(pool_id
)->requires_aligned_append();
141 uint64_t librados::RadosClient::pool_required_alignment(int64_t pool_id
)
144 int r
= pool_required_alignment2(pool_id
, &alignment
);
152 // a safer version of pool_required_alignment
153 int librados::RadosClient::pool_required_alignment2(int64_t pool_id
,
159 int r
= wait_for_osdmap();
164 return objecter
->with_osdmap([alignment
, pool_id
](const OSDMap
&o
) {
165 if (!o
.have_pg_pool(pool_id
)) {
168 *alignment
= o
.get_pg_pool(pool_id
)->required_alignment();
173 int librados::RadosClient::pool_get_auid(uint64_t pool_id
,
174 unsigned long long *auid
)
176 int r
= wait_for_osdmap();
179 objecter
->with_osdmap([&](const OSDMap
& o
) {
180 const pg_pool_t
*pg
= o
.get_pg_pool(pool_id
);
191 int librados::RadosClient::pool_get_name(uint64_t pool_id
, std::string
*s
)
193 int r
= wait_for_osdmap();
196 objecter
->with_osdmap([&](const OSDMap
& o
) {
197 if (!o
.have_pg_pool(pool_id
)) {
201 *s
= o
.get_pool_name(pool_id
);
207 int librados::RadosClient::get_fsid(std::string
*s
)
211 Mutex::Locker
l(lock
);
213 oss
<< monclient
.get_fsid();
218 int librados::RadosClient::ping_monitor(const string mon_id
, string
*result
)
221 /* If we haven't yet connected, we have no way of telling whether we
222 * already built monc's initial monmap. IF we are in CONNECTED state,
223 * then it is safe to assume that we went through connect(), which does
226 if (state
!= CONNECTED
) {
227 ldout(cct
, 10) << __func__
<< " build monmap" << dendl
;
228 err
= monclient
.build_initial_monmap();
234 err
= monclient
.ping_monitor(mon_id
, result
);
238 int librados::RadosClient::connect()
240 common_init_finish(cct
);
244 // already connected?
245 if (state
== CONNECTING
)
247 if (state
== CONNECTED
)
252 err
= monclient
.build_initial_monmap();
257 messenger
= Messenger::create_client_messenger(cct
, "radosclient");
261 // require OSDREPLYMUX feature. this means we will fail to talk to
262 // old servers. this is necessary because otherwise we won't know
263 // how to decompose the reply data into its constituent pieces.
264 messenger
->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX
));
266 ldout(cct
, 1) << "starting msgr at " << messenger
->get_myaddr() << dendl
;
268 ldout(cct
, 1) << "starting objecter" << dendl
;
270 objecter
= new (std::nothrow
) Objecter(cct
, messenger
, &monclient
,
272 cct
->_conf
->rados_mon_op_timeout
,
273 cct
->_conf
->rados_osd_op_timeout
);
276 objecter
->set_balanced_budget();
278 monclient
.set_messenger(messenger
);
279 mgrclient
.set_messenger(messenger
);
282 messenger
->add_dispatcher_head(&mgrclient
);
283 messenger
->add_dispatcher_tail(objecter
);
284 messenger
->add_dispatcher_tail(this);
288 ldout(cct
, 1) << "setting wanted keys" << dendl
;
289 monclient
.set_want_keys(
290 CEPH_ENTITY_TYPE_MON
| CEPH_ENTITY_TYPE_OSD
| CEPH_ENTITY_TYPE_MGR
);
291 ldout(cct
, 1) << "calling monclient init" << dendl
;
292 err
= monclient
.init();
294 ldout(cct
, 0) << conf
->name
<< " initialization error " << cpp_strerror(-err
) << dendl
;
299 err
= monclient
.authenticate(conf
->client_mount_timeout
);
301 ldout(cct
, 0) << conf
->name
<< " authentication error " << cpp_strerror(-err
) << dendl
;
305 messenger
->set_myname(entity_name_t::CLIENT(monclient
.get_global_id()));
307 // MgrClient needs this (it doesn't have MonClient reference itself)
308 monclient
.sub_want("mgrmap", 0, 0);
309 monclient
.renew_subs();
311 if (service_daemon
) {
312 ldout(cct
, 10) << __func__
<< " registering as " << service_name
<< "."
313 << daemon_name
<< dendl
;
314 mgrclient
.service_daemon_register(service_name
, daemon_name
,
319 objecter
->set_client_incarnation(0);
328 instance_id
= monclient
.get_global_id();
332 ldout(cct
, 1) << "init done" << dendl
;
337 state
= DISCONNECTED
;
352 void librados::RadosClient::shutdown()
355 if (state
== DISCONNECTED
) {
360 bool need_objecter
= false;
361 if (objecter
&& objecter
->initialized
) {
362 need_objecter
= true;
365 if (state
== CONNECTED
) {
367 // make sure watch callbacks are flushed
370 finisher
.wait_for_empty();
373 state
= DISCONNECTED
;
375 timer
.shutdown(); // will drop+retake lock
378 objecter
->shutdown();
380 mgrclient
.shutdown();
382 monclient
.shutdown();
384 messenger
->shutdown();
387 ldout(cct
, 1) << "shutdown" << dendl
;
390 int librados::RadosClient::watch_flush()
392 ldout(cct
, 10) << __func__
<< " enter" << dendl
;
393 Mutex
mylock("RadosClient::watch_flush::mylock");
396 objecter
->linger_callback_flush(new C_SafeCond(&mylock
, &cond
, &done
));
403 ldout(cct
, 10) << __func__
<< " exit" << dendl
;
407 struct C_aio_watch_flush_Complete
: public Context
{
408 librados::RadosClient
*client
;
409 librados::AioCompletionImpl
*c
;
411 C_aio_watch_flush_Complete(librados::RadosClient
*_client
, librados::AioCompletionImpl
*_c
)
412 : client(_client
), c(_c
) {
416 void finish(int r
) override
{
422 if (c
->callback_complete
||
424 client
->finisher
.queue(new librados::C_AioComplete(c
));
430 int librados::RadosClient::async_watch_flush(AioCompletionImpl
*c
)
432 ldout(cct
, 10) << __func__
<< " enter" << dendl
;
433 Context
*oncomplete
= new C_aio_watch_flush_Complete(this, c
);
434 objecter
->linger_callback_flush(oncomplete
);
435 ldout(cct
, 10) << __func__
<< " exit" << dendl
;
439 uint64_t librados::RadosClient::get_instance_id()
444 librados::RadosClient::~RadosClient()
453 int librados::RadosClient::create_ioctx(const char *name
, IoCtxImpl
**io
)
455 int64_t poolid
= lookup_pool(name
);
460 *io
= new librados::IoCtxImpl(this, objecter
, poolid
, CEPH_NOSNAP
);
464 int librados::RadosClient::create_ioctx(int64_t pool_id
, IoCtxImpl
**io
)
466 *io
= new librados::IoCtxImpl(this, objecter
, pool_id
, CEPH_NOSNAP
);
470 bool librados::RadosClient::ms_dispatch(Message
*m
)
474 Mutex::Locker
l(lock
);
475 if (state
== DISCONNECTED
) {
476 ldout(cct
, 10) << "disconnected, discarding " << *m
<< dendl
;
485 void librados::RadosClient::ms_handle_connect(Connection
*con
)
489 bool librados::RadosClient::ms_handle_reset(Connection
*con
)
494 void librados::RadosClient::ms_handle_remote_reset(Connection
*con
)
498 bool librados::RadosClient::ms_handle_refused(Connection
*con
)
503 bool librados::RadosClient::_dispatch(Message
*m
)
505 assert(lock
.is_locked());
506 switch (m
->get_type()) {
508 case CEPH_MSG_OSD_MAP
:
513 case CEPH_MSG_MDS_MAP
:
518 handle_log(static_cast<MLog
*>(m
));
529 int librados::RadosClient::wait_for_osdmap()
531 assert(!lock
.is_locked_by_me());
533 if (state
!= CONNECTED
) {
537 bool need_map
= false;
538 objecter
->with_osdmap([&](const OSDMap
& o
) {
539 if (o
.get_epoch() == 0) {
545 Mutex::Locker
l(lock
);
548 if (cct
->_conf
->rados_mon_op_timeout
> 0)
549 timeout
.set_from_double(cct
->_conf
->rados_mon_op_timeout
);
551 if (objecter
->with_osdmap(std::mem_fn(&OSDMap::get_epoch
)) == 0) {
552 ldout(cct
, 10) << __func__
<< " waiting" << dendl
;
553 utime_t start
= ceph_clock_now();
554 while (objecter
->with_osdmap(std::mem_fn(&OSDMap::get_epoch
)) == 0) {
555 if (timeout
.is_zero()) {
558 cond
.WaitInterval(lock
, timeout
);
559 utime_t elapsed
= ceph_clock_now() - start
;
560 if (elapsed
> timeout
) {
561 lderr(cct
) << "timed out waiting for first osdmap from monitors"
567 ldout(cct
, 10) << __func__
<< " done waiting" << dendl
;
576 int librados::RadosClient::wait_for_latest_osdmap()
578 Mutex
mylock("RadosClient::wait_for_latest_osdmap");
582 objecter
->wait_for_latest_osdmap(new C_SafeCond(&mylock
, &cond
, &done
));
592 int librados::RadosClient::pool_list(std::list
<std::pair
<int64_t, string
> >& v
)
594 int r
= wait_for_osdmap();
598 objecter
->with_osdmap([&](const OSDMap
& o
) {
599 for (auto p
: o
.get_pools())
600 v
.push_back(std::make_pair(p
.first
, o
.get_pool_name(p
.first
)));
605 int librados::RadosClient::get_pool_stats(std::list
<string
>& pools
,
606 map
<string
,::pool_stat_t
>& result
)
608 Mutex
mylock("RadosClient::get_pool_stats::mylock");
613 objecter
->get_pool_stats(pools
, &result
, new C_SafeCond(&mylock
, &cond
, &done
,
624 bool librados::RadosClient::get_pool_is_selfmanaged_snaps_mode(
625 const std::string
& pool
)
628 objecter
->with_osdmap([&](const OSDMap
& osdmap
) {
629 int64_t poolid
= osdmap
.lookup_pg_pool_name(pool
);
631 ret
= osdmap
.get_pg_pool(poolid
)->is_unmanaged_snaps_mode();
636 int librados::RadosClient::get_fs_stats(ceph_statfs
& stats
)
638 Mutex
mylock ("RadosClient::get_fs_stats::mylock");
644 objecter
->get_fs_stats(stats
, new C_SafeCond(&mylock
, &cond
, &done
, &ret
));
648 while (!done
) cond
.Wait(mylock
);
654 void librados::RadosClient::get() {
655 Mutex::Locker
l(lock
);
660 bool librados::RadosClient::put() {
661 Mutex::Locker
l(lock
);
664 return (refcnt
== 0);
667 int librados::RadosClient::pool_create(string
& name
, unsigned long long auid
,
670 int r
= wait_for_osdmap();
675 Mutex
mylock ("RadosClient::pool_create::mylock");
679 Context
*onfinish
= new C_SafeCond(&mylock
, &cond
, &done
, &reply
);
680 reply
= objecter
->create_pool(name
, onfinish
, auid
, crush_rule
);
693 int librados::RadosClient::pool_create_async(string
& name
, PoolAsyncCompletionImpl
*c
,
694 unsigned long long auid
,
697 int r
= wait_for_osdmap();
701 Context
*onfinish
= new C_PoolAsync_Safe(c
);
702 r
= objecter
->create_pool(name
, onfinish
, auid
, crush_rule
);
709 int librados::RadosClient::pool_get_base_tier(int64_t pool_id
, int64_t* base_tier
)
711 int r
= wait_for_osdmap();
716 objecter
->with_osdmap([&](const OSDMap
& o
) {
717 const pg_pool_t
* pool
= o
.get_pg_pool(pool_id
);
719 if (pool
->tier_of
< 0) {
720 *base_tier
= pool_id
;
722 *base_tier
= pool
->tier_of
;
732 int librados::RadosClient::pool_delete(const char *name
)
734 int r
= wait_for_osdmap();
739 Mutex
mylock("RadosClient::pool_delete::mylock");
743 Context
*onfinish
= new C_SafeCond(&mylock
, &cond
, &done
, &ret
);
744 ret
= objecter
->delete_pool(name
, onfinish
);
757 int librados::RadosClient::pool_delete_async(const char *name
, PoolAsyncCompletionImpl
*c
)
759 int r
= wait_for_osdmap();
763 Context
*onfinish
= new C_PoolAsync_Safe(c
);
764 r
= objecter
->delete_pool(name
, onfinish
);
771 void librados::RadosClient::blacklist_self(bool set
) {
772 Mutex::Locker
l(lock
);
773 objecter
->blacklist_self(set
);
776 int librados::RadosClient::blacklist_add(const string
& client_address
,
777 uint32_t expire_seconds
)
780 if (!addr
.parse(client_address
.c_str(), 0)) {
781 lderr(cct
) << "unable to parse address " << client_address
<< dendl
;
785 std::stringstream cmd
;
787 << "\"prefix\": \"osd blacklist\", "
788 << "\"blacklistop\": \"add\", "
789 << "\"addr\": \"" << client_address
<< "\"";
790 if (expire_seconds
!= 0) {
791 cmd
<< ", \"expire\": " << expire_seconds
<< ".0";
795 std::vector
<std::string
> cmds
;
796 cmds
.push_back(cmd
.str());
798 int r
= mon_command(cmds
, inbl
, NULL
, NULL
);
803 // ensure we have the latest osd map epoch before proceeding
804 r
= wait_for_latest_osdmap();
808 int librados::RadosClient::mon_command(const vector
<string
>& cmd
,
809 const bufferlist
&inbl
,
810 bufferlist
*outbl
, string
*outs
)
812 Mutex
mylock("RadosClient::mon_command::mylock");
817 monclient
.start_mon_command(cmd
, inbl
, outbl
, outs
,
818 new C_SafeCond(&mylock
, &cond
, &done
, &rval
));
828 int librados::RadosClient::mgr_command(const vector
<string
>& cmd
,
829 const bufferlist
&inbl
,
830 bufferlist
*outbl
, string
*outs
)
832 Mutex::Locker
l(lock
);
835 int r
= mgrclient
.start_command(cmd
, inbl
, outbl
, outs
, &cond
);
847 int librados::RadosClient::mon_command(int rank
, const vector
<string
>& cmd
,
848 const bufferlist
&inbl
,
849 bufferlist
*outbl
, string
*outs
)
851 Mutex
mylock("RadosClient::mon_command::mylock");
856 monclient
.start_mon_command(rank
, cmd
, inbl
, outbl
, outs
,
857 new C_SafeCond(&mylock
, &cond
, &done
, &rval
));
866 int librados::RadosClient::mon_command(string name
, const vector
<string
>& cmd
,
867 const bufferlist
&inbl
,
868 bufferlist
*outbl
, string
*outs
)
870 Mutex
mylock("RadosClient::mon_command::mylock");
875 monclient
.start_mon_command(name
, cmd
, inbl
, outbl
, outs
,
876 new C_SafeCond(&mylock
, &cond
, &done
, &rval
));
885 int librados::RadosClient::osd_command(int osd
, vector
<string
>& cmd
,
886 const bufferlist
& inbl
,
887 bufferlist
*poutbl
, string
*prs
)
889 Mutex
mylock("RadosClient::osd_command::mylock");
899 // XXX do anything with tid?
900 objecter
->osd_command(osd
, cmd
, inbl
, &tid
, poutbl
, prs
,
901 new C_SafeCond(&mylock
, &cond
, &done
, &ret
));
910 int librados::RadosClient::pg_command(pg_t pgid
, vector
<string
>& cmd
,
911 const bufferlist
& inbl
,
912 bufferlist
*poutbl
, string
*prs
)
914 Mutex
mylock("RadosClient::pg_command::mylock");
920 objecter
->pg_command(pgid
, cmd
, inbl
, &tid
, poutbl
, prs
,
921 new C_SafeCond(&mylock
, &cond
, &done
, &ret
));
930 int librados::RadosClient::monitor_log(const string
& level
,
931 rados_log_callback_t cb
,
932 rados_log_callback2_t cb2
,
935 Mutex::Locker
l(lock
);
937 if (state
!= CONNECTED
) {
941 if (cb
== NULL
&& cb2
== NULL
) {
943 ldout(cct
, 10) << __func__
<< " removing cb " << (void*)log_cb
944 << " " << (void*)log_cb2
<< dendl
;
945 monclient
.sub_unwant(log_watch
);
954 if (level
== "debug") {
955 watch_level
= "log-debug";
956 } else if (level
== "info") {
957 watch_level
= "log-info";
958 } else if (level
== "warn" || level
== "warning") {
959 watch_level
= "log-warn";
960 } else if (level
== "err" || level
== "error") {
961 watch_level
= "log-error";
962 } else if (level
== "sec") {
963 watch_level
= "log-sec";
965 ldout(cct
, 10) << __func__
<< " invalid level " << level
<< dendl
;
969 if (log_cb
|| log_cb2
)
970 monclient
.sub_unwant(log_watch
);
973 ldout(cct
, 10) << __func__
<< " add cb " << (void*)cb
<< " " << (void*)cb2
974 << " level " << level
<< dendl
;
975 monclient
.sub_want(watch_level
, 0, 0);
977 monclient
.renew_subs();
981 log_watch
= watch_level
;
985 void librados::RadosClient::handle_log(MLog
*m
)
987 assert(lock
.is_locked());
988 ldout(cct
, 10) << __func__
<< " version " << m
->version
<< dendl
;
990 if (log_last_version
< m
->version
) {
991 log_last_version
= m
->version
;
993 if (log_cb
|| log_cb2
) {
994 for (std::deque
<LogEntry
>::iterator it
= m
->entries
.begin(); it
!= m
->entries
.end(); ++it
) {
997 ss
<< e
.stamp
<< " " << e
.name
<< " " << e
.prio
<< " " << e
.msg
;
998 string line
= ss
.str();
999 string who
= stringify(e
.who
);
1000 string name
= stringify(e
.name
);
1001 string level
= stringify(e
.prio
);
1002 struct timespec stamp
;
1003 e
.stamp
.to_timespec(&stamp
);
1005 ldout(cct
, 20) << __func__
<< " delivering " << ss
.str() << dendl
;
1007 log_cb(log_cb_arg
, line
.c_str(), who
.c_str(),
1008 stamp
.tv_sec
, stamp
.tv_nsec
,
1009 e
.seq
, level
.c_str(), e
.msg
.c_str());
1011 log_cb2(log_cb_arg
, line
.c_str(),
1013 who
.c_str(), name
.c_str(),
1014 stamp
.tv_sec
, stamp
.tv_nsec
,
1015 e
.seq
, level
.c_str(), e
.msg
.c_str());
1019 monclient
.sub_got(log_watch
, log_last_version
);
1025 int librados::RadosClient::service_daemon_register(
1026 const std::string
& service
, ///< service name (e.g., 'rgw')
1027 const std::string
& name
, ///< daemon name (e.g., 'gwfoo')
1028 const std::map
<std::string
,std::string
>& metadata
)
1030 if (service_daemon
) {
1033 if (service
== "osd" ||
1035 service
== "client" ||
1038 // normal ceph entity types are not allowed!
1041 if (service
.empty() || name
.empty()) {
1045 collect_sys_info(&daemon_metadata
, cct
);
1047 ldout(cct
,10) << __func__
<< " " << service
<< "." << name
<< dendl
;
1048 service_daemon
= true;
1049 service_name
= service
;
1051 daemon_metadata
.insert(metadata
.begin(), metadata
.end());
1053 if (state
== DISCONNECTED
) {
1056 if (state
== CONNECTING
) {
1059 mgrclient
.service_daemon_register(service_name
, daemon_name
,
1064 int librados::RadosClient::service_daemon_update_status(
1065 const std::map
<std::string
,std::string
>& status
)
1067 if (state
!= CONNECTED
) {
1070 return mgrclient
.service_daemon_update_status(status
);