1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <sys/types.h>
25 #include "common/ceph_context.h"
26 #include "common/config.h"
27 #include "common/common_init.h"
28 #include "common/errno.h"
29 #include "include/buffer.h"
30 #include "include/stringify.h"
32 #include "messages/MLog.h"
33 #include "msg/Messenger.h"
35 // needed for static_cast
36 #include "messages/PaxosServiceMessage.h"
37 #include "messages/MPoolOpReply.h"
38 #include "messages/MStatfsReply.h"
39 #include "messages/MGetPoolStatsReply.h"
40 #include "messages/MOSDOpReply.h"
41 #include "messages/MOSDMap.h"
42 #include "messages/MCommandReply.h"
44 #include "AioCompletionImpl.h"
45 #include "IoCtxImpl.h"
46 #include "PoolAsyncCompletionImpl.h"
47 #include "RadosClient.h"
49 #include "include/assert.h"
50 #include "common/EventTrace.h"
52 #define dout_subsys ceph_subsys_rados
54 #define dout_prefix *_dout << "librados: "
56 bool librados::RadosClient::ms_get_authorizer(int dest_type
,
57 AuthAuthorizer
**authorizer
,
59 //ldout(cct, 0) << "RadosClient::ms_get_authorizer type=" << dest_type << dendl;
60 /* monitor authorization is being handled on different layer */
61 if (dest_type
== CEPH_ENTITY_TYPE_MON
)
63 *authorizer
= monclient
.build_authorizer(dest_type
);
64 return *authorizer
!= NULL
;
67 librados::RadosClient::RadosClient(CephContext
*cct_
)
68 : Dispatcher(cct_
->get()),
69 cct_deleter
{cct_
, [](CephContext
*p
) {p
->put();}},
73 mgrclient(cct_
, nullptr),
77 lock("librados::RadosClient::lock"),
80 log_last_version(0), log_cb(NULL
), log_cb2(NULL
), log_cb_arg(NULL
),
81 finisher(cct
, "radosclient", "fn-radosclient")
85 int64_t librados::RadosClient::lookup_pool(const char *name
)
87 int r
= wait_for_osdmap();
92 int64_t ret
= objecter
->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name
),
95 // Make sure we have the latest map
96 int r
= wait_for_latest_osdmap();
99 ret
= objecter
->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name
),
106 bool librados::RadosClient::pool_requires_alignment(int64_t pool_id
)
109 int r
= pool_requires_alignment2(pool_id
, &requires
);
111 // Cast answer to false, this is a little bit problematic
112 // since we really don't know the answer yet, say.
119 // a safer version of pool_requires_alignment
120 int librados::RadosClient::pool_requires_alignment2(int64_t pool_id
,
126 int r
= wait_for_osdmap();
131 return objecter
->with_osdmap([requires
, pool_id
](const OSDMap
& o
) {
132 if (!o
.have_pg_pool(pool_id
)) {
135 *requires
= o
.get_pg_pool(pool_id
)->requires_aligned_append();
140 uint64_t librados::RadosClient::pool_required_alignment(int64_t pool_id
)
143 int r
= pool_required_alignment2(pool_id
, &alignment
);
151 // a safer version of pool_required_alignment
152 int librados::RadosClient::pool_required_alignment2(int64_t pool_id
,
158 int r
= wait_for_osdmap();
163 return objecter
->with_osdmap([alignment
, pool_id
](const OSDMap
&o
) {
164 if (!o
.have_pg_pool(pool_id
)) {
167 *alignment
= o
.get_pg_pool(pool_id
)->required_alignment();
172 int librados::RadosClient::pool_get_auid(uint64_t pool_id
,
173 unsigned long long *auid
)
175 int r
= wait_for_osdmap();
178 objecter
->with_osdmap([&](const OSDMap
& o
) {
179 const pg_pool_t
*pg
= o
.get_pg_pool(pool_id
);
190 int librados::RadosClient::pool_get_name(uint64_t pool_id
, std::string
*s
)
192 int r
= wait_for_osdmap();
195 objecter
->with_osdmap([&](const OSDMap
& o
) {
196 if (!o
.have_pg_pool(pool_id
)) {
200 *s
= o
.get_pool_name(pool_id
);
206 int librados::RadosClient::get_fsid(std::string
*s
)
210 Mutex::Locker
l(lock
);
212 oss
<< monclient
.get_fsid();
217 int librados::RadosClient::ping_monitor(const string mon_id
, string
*result
)
220 /* If we haven't yet connected, we have no way of telling whether we
221 * already built monc's initial monmap. IF we are in CONNECTED state,
222 * then it is safe to assume that we went through connect(), which does
225 if (state
!= CONNECTED
) {
226 ldout(cct
, 10) << __func__
<< " build monmap" << dendl
;
227 err
= monclient
.build_initial_monmap();
233 err
= monclient
.ping_monitor(mon_id
, result
);
237 int librados::RadosClient::connect()
239 common_init_finish(cct
);
243 // already connected?
244 if (state
== CONNECTING
)
246 if (state
== CONNECTED
)
251 err
= monclient
.build_initial_monmap();
256 messenger
= Messenger::create_client_messenger(cct
, "radosclient");
260 // require OSDREPLYMUX feature. this means we will fail to talk to
261 // old servers. this is necessary because otherwise we won't know
262 // how to decompose the reply data into its constituent pieces.
263 messenger
->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX
));
265 ldout(cct
, 1) << "starting msgr at " << messenger
->get_myaddr() << dendl
;
267 ldout(cct
, 1) << "starting objecter" << dendl
;
269 objecter
= new (std::nothrow
) Objecter(cct
, messenger
, &monclient
,
271 cct
->_conf
->rados_mon_op_timeout
,
272 cct
->_conf
->rados_osd_op_timeout
);
275 objecter
->set_balanced_budget();
277 monclient
.set_messenger(messenger
);
278 mgrclient
.set_messenger(messenger
);
281 messenger
->add_dispatcher_head(&mgrclient
);
282 messenger
->add_dispatcher_tail(objecter
);
283 messenger
->add_dispatcher_tail(this);
287 ldout(cct
, 1) << "setting wanted keys" << dendl
;
288 monclient
.set_want_keys(
289 CEPH_ENTITY_TYPE_MON
| CEPH_ENTITY_TYPE_OSD
| CEPH_ENTITY_TYPE_MGR
);
290 ldout(cct
, 1) << "calling monclient init" << dendl
;
291 err
= monclient
.init();
293 ldout(cct
, 0) << conf
->name
<< " initialization error " << cpp_strerror(-err
) << dendl
;
298 err
= monclient
.authenticate(conf
->client_mount_timeout
);
300 ldout(cct
, 0) << conf
->name
<< " authentication error " << cpp_strerror(-err
) << dendl
;
304 messenger
->set_myname(entity_name_t::CLIENT(monclient
.get_global_id()));
306 // MgrClient needs this (it doesn't have MonClient reference itself)
307 monclient
.sub_want("mgrmap", 0, 0);
308 monclient
.renew_subs();
312 objecter
->set_client_incarnation(0);
321 instance_id
= monclient
.get_global_id();
325 ldout(cct
, 1) << "init done" << dendl
;
330 state
= DISCONNECTED
;
345 void librados::RadosClient::shutdown()
348 if (state
== DISCONNECTED
) {
353 bool need_objecter
= false;
354 if (objecter
&& objecter
->initialized
) {
355 need_objecter
= true;
358 if (state
== CONNECTED
) {
360 // make sure watch callbacks are flushed
363 finisher
.wait_for_empty();
366 state
= DISCONNECTED
;
368 timer
.shutdown(); // will drop+retake lock
371 objecter
->shutdown();
373 mgrclient
.shutdown();
375 monclient
.shutdown();
377 messenger
->shutdown();
380 ldout(cct
, 1) << "shutdown" << dendl
;
383 int librados::RadosClient::watch_flush()
385 ldout(cct
, 10) << __func__
<< " enter" << dendl
;
386 Mutex
mylock("RadosClient::watch_flush::mylock");
389 objecter
->linger_callback_flush(new C_SafeCond(&mylock
, &cond
, &done
));
396 ldout(cct
, 10) << __func__
<< " exit" << dendl
;
400 struct C_aio_watch_flush_Complete
: public Context
{
401 librados::RadosClient
*client
;
402 librados::AioCompletionImpl
*c
;
404 C_aio_watch_flush_Complete(librados::RadosClient
*_client
, librados::AioCompletionImpl
*_c
)
405 : client(_client
), c(_c
) {
409 void finish(int r
) override
{
415 if (c
->callback_complete
||
417 client
->finisher
.queue(new librados::C_AioComplete(c
));
423 int librados::RadosClient::async_watch_flush(AioCompletionImpl
*c
)
425 ldout(cct
, 10) << __func__
<< " enter" << dendl
;
426 Context
*oncomplete
= new C_aio_watch_flush_Complete(this, c
);
427 objecter
->linger_callback_flush(oncomplete
);
428 ldout(cct
, 10) << __func__
<< " exit" << dendl
;
432 uint64_t librados::RadosClient::get_instance_id()
437 librados::RadosClient::~RadosClient()
446 int librados::RadosClient::create_ioctx(const char *name
, IoCtxImpl
**io
)
448 int64_t poolid
= lookup_pool(name
);
453 *io
= new librados::IoCtxImpl(this, objecter
, poolid
, CEPH_NOSNAP
);
457 int librados::RadosClient::create_ioctx(int64_t pool_id
, IoCtxImpl
**io
)
459 *io
= new librados::IoCtxImpl(this, objecter
, pool_id
, CEPH_NOSNAP
);
463 bool librados::RadosClient::ms_dispatch(Message
*m
)
467 Mutex::Locker
l(lock
);
468 if (state
== DISCONNECTED
) {
469 ldout(cct
, 10) << "disconnected, discarding " << *m
<< dendl
;
478 void librados::RadosClient::ms_handle_connect(Connection
*con
)
482 bool librados::RadosClient::ms_handle_reset(Connection
*con
)
487 void librados::RadosClient::ms_handle_remote_reset(Connection
*con
)
491 bool librados::RadosClient::ms_handle_refused(Connection
*con
)
496 bool librados::RadosClient::_dispatch(Message
*m
)
498 assert(lock
.is_locked());
499 switch (m
->get_type()) {
501 case CEPH_MSG_OSD_MAP
:
506 case CEPH_MSG_MDS_MAP
:
511 handle_log(static_cast<MLog
*>(m
));
522 int librados::RadosClient::wait_for_osdmap()
524 assert(!lock
.is_locked_by_me());
526 if (state
!= CONNECTED
) {
530 bool need_map
= false;
531 objecter
->with_osdmap([&](const OSDMap
& o
) {
532 if (o
.get_epoch() == 0) {
538 Mutex::Locker
l(lock
);
541 if (cct
->_conf
->rados_mon_op_timeout
> 0)
542 timeout
.set_from_double(cct
->_conf
->rados_mon_op_timeout
);
544 if (objecter
->with_osdmap(std::mem_fn(&OSDMap::get_epoch
)) == 0) {
545 ldout(cct
, 10) << __func__
<< " waiting" << dendl
;
546 utime_t start
= ceph_clock_now();
547 while (objecter
->with_osdmap(std::mem_fn(&OSDMap::get_epoch
)) == 0) {
548 if (timeout
.is_zero()) {
551 cond
.WaitInterval(lock
, timeout
);
552 utime_t elapsed
= ceph_clock_now() - start
;
553 if (elapsed
> timeout
) {
554 lderr(cct
) << "timed out waiting for first osdmap from monitors"
560 ldout(cct
, 10) << __func__
<< " done waiting" << dendl
;
569 int librados::RadosClient::wait_for_latest_osdmap()
571 Mutex
mylock("RadosClient::wait_for_latest_osdmap");
575 objecter
->wait_for_latest_osdmap(new C_SafeCond(&mylock
, &cond
, &done
));
585 int librados::RadosClient::pool_list(std::list
<std::pair
<int64_t, string
> >& v
)
587 int r
= wait_for_osdmap();
591 objecter
->with_osdmap([&](const OSDMap
& o
) {
592 for (auto p
: o
.get_pools())
593 v
.push_back(std::make_pair(p
.first
, o
.get_pool_name(p
.first
)));
598 int librados::RadosClient::get_pool_stats(std::list
<string
>& pools
,
599 map
<string
,::pool_stat_t
>& result
)
601 Mutex
mylock("RadosClient::get_pool_stats::mylock");
606 objecter
->get_pool_stats(pools
, &result
, new C_SafeCond(&mylock
, &cond
, &done
,
617 bool librados::RadosClient::get_pool_is_selfmanaged_snaps_mode(
618 const std::string
& pool
)
621 objecter
->with_osdmap([&](const OSDMap
& osdmap
) {
622 int64_t poolid
= osdmap
.lookup_pg_pool_name(pool
);
624 ret
= osdmap
.get_pg_pool(poolid
)->is_unmanaged_snaps_mode();
629 int librados::RadosClient::get_fs_stats(ceph_statfs
& stats
)
631 Mutex
mylock ("RadosClient::get_fs_stats::mylock");
637 objecter
->get_fs_stats(stats
, new C_SafeCond(&mylock
, &cond
, &done
, &ret
));
641 while (!done
) cond
.Wait(mylock
);
647 void librados::RadosClient::get() {
648 Mutex::Locker
l(lock
);
653 bool librados::RadosClient::put() {
654 Mutex::Locker
l(lock
);
657 return (refcnt
== 0);
660 int librados::RadosClient::pool_create(string
& name
, unsigned long long auid
,
663 int r
= wait_for_osdmap();
668 Mutex
mylock ("RadosClient::pool_create::mylock");
672 Context
*onfinish
= new C_SafeCond(&mylock
, &cond
, &done
, &reply
);
673 reply
= objecter
->create_pool(name
, onfinish
, auid
, crush_rule
);
686 int librados::RadosClient::pool_create_async(string
& name
, PoolAsyncCompletionImpl
*c
,
687 unsigned long long auid
,
690 int r
= wait_for_osdmap();
694 Context
*onfinish
= new C_PoolAsync_Safe(c
);
695 r
= objecter
->create_pool(name
, onfinish
, auid
, crush_rule
);
702 int librados::RadosClient::pool_get_base_tier(int64_t pool_id
, int64_t* base_tier
)
704 int r
= wait_for_osdmap();
709 objecter
->with_osdmap([&](const OSDMap
& o
) {
710 const pg_pool_t
* pool
= o
.get_pg_pool(pool_id
);
712 if (pool
->tier_of
< 0) {
713 *base_tier
= pool_id
;
715 *base_tier
= pool
->tier_of
;
725 int librados::RadosClient::pool_delete(const char *name
)
727 int r
= wait_for_osdmap();
732 Mutex
mylock("RadosClient::pool_delete::mylock");
736 Context
*onfinish
= new C_SafeCond(&mylock
, &cond
, &done
, &ret
);
737 ret
= objecter
->delete_pool(name
, onfinish
);
750 int librados::RadosClient::pool_delete_async(const char *name
, PoolAsyncCompletionImpl
*c
)
752 int r
= wait_for_osdmap();
756 Context
*onfinish
= new C_PoolAsync_Safe(c
);
757 r
= objecter
->delete_pool(name
, onfinish
);
764 void librados::RadosClient::blacklist_self(bool set
) {
765 Mutex::Locker
l(lock
);
766 objecter
->blacklist_self(set
);
769 int librados::RadosClient::blacklist_add(const string
& client_address
,
770 uint32_t expire_seconds
)
773 if (!addr
.parse(client_address
.c_str(), 0)) {
774 lderr(cct
) << "unable to parse address " << client_address
<< dendl
;
778 std::stringstream cmd
;
780 << "\"prefix\": \"osd blacklist\", "
781 << "\"blacklistop\": \"add\", "
782 << "\"addr\": \"" << client_address
<< "\"";
783 if (expire_seconds
!= 0) {
784 cmd
<< ", \"expire\": " << expire_seconds
<< ".0";
788 std::vector
<std::string
> cmds
;
789 cmds
.push_back(cmd
.str());
791 int r
= mon_command(cmds
, inbl
, NULL
, NULL
);
796 // ensure we have the latest osd map epoch before proceeding
797 r
= wait_for_latest_osdmap();
801 int librados::RadosClient::mon_command(const vector
<string
>& cmd
,
802 const bufferlist
&inbl
,
803 bufferlist
*outbl
, string
*outs
)
805 Mutex
mylock("RadosClient::mon_command::mylock");
810 monclient
.start_mon_command(cmd
, inbl
, outbl
, outs
,
811 new C_SafeCond(&mylock
, &cond
, &done
, &rval
));
821 int librados::RadosClient::mgr_command(const vector
<string
>& cmd
,
822 const bufferlist
&inbl
,
823 bufferlist
*outbl
, string
*outs
)
825 Mutex::Locker
l(lock
);
828 int r
= mgrclient
.start_command(cmd
, inbl
, outbl
, outs
, &cond
);
840 int librados::RadosClient::mon_command(int rank
, const vector
<string
>& cmd
,
841 const bufferlist
&inbl
,
842 bufferlist
*outbl
, string
*outs
)
844 Mutex
mylock("RadosClient::mon_command::mylock");
849 monclient
.start_mon_command(rank
, cmd
, inbl
, outbl
, outs
,
850 new C_SafeCond(&mylock
, &cond
, &done
, &rval
));
859 int librados::RadosClient::mon_command(string name
, const vector
<string
>& cmd
,
860 const bufferlist
&inbl
,
861 bufferlist
*outbl
, string
*outs
)
863 Mutex
mylock("RadosClient::mon_command::mylock");
868 monclient
.start_mon_command(name
, cmd
, inbl
, outbl
, outs
,
869 new C_SafeCond(&mylock
, &cond
, &done
, &rval
));
878 int librados::RadosClient::osd_command(int osd
, vector
<string
>& cmd
,
879 const bufferlist
& inbl
,
880 bufferlist
*poutbl
, string
*prs
)
882 Mutex
mylock("RadosClient::osd_command::mylock");
892 // XXX do anything with tid?
893 objecter
->osd_command(osd
, cmd
, inbl
, &tid
, poutbl
, prs
,
894 new C_SafeCond(&mylock
, &cond
, &done
, &ret
));
903 int librados::RadosClient::pg_command(pg_t pgid
, vector
<string
>& cmd
,
904 const bufferlist
& inbl
,
905 bufferlist
*poutbl
, string
*prs
)
907 Mutex
mylock("RadosClient::pg_command::mylock");
913 objecter
->pg_command(pgid
, cmd
, inbl
, &tid
, poutbl
, prs
,
914 new C_SafeCond(&mylock
, &cond
, &done
, &ret
));
923 int librados::RadosClient::monitor_log(const string
& level
,
924 rados_log_callback_t cb
,
925 rados_log_callback2_t cb2
,
928 Mutex::Locker
l(lock
);
930 if (state
!= CONNECTED
) {
934 if (cb
== NULL
&& cb2
== NULL
) {
936 ldout(cct
, 10) << __func__
<< " removing cb " << (void*)log_cb
937 << " " << (void*)log_cb2
<< dendl
;
938 monclient
.sub_unwant(log_watch
);
947 if (level
== "debug") {
948 watch_level
= "log-debug";
949 } else if (level
== "info") {
950 watch_level
= "log-info";
951 } else if (level
== "warn" || level
== "warning") {
952 watch_level
= "log-warn";
953 } else if (level
== "err" || level
== "error") {
954 watch_level
= "log-error";
955 } else if (level
== "sec") {
956 watch_level
= "log-sec";
958 ldout(cct
, 10) << __func__
<< " invalid level " << level
<< dendl
;
962 if (log_cb
|| log_cb2
)
963 monclient
.sub_unwant(log_watch
);
966 ldout(cct
, 10) << __func__
<< " add cb " << (void*)cb
<< " " << (void*)cb2
967 << " level " << level
<< dendl
;
968 monclient
.sub_want(watch_level
, 0, 0);
970 monclient
.renew_subs();
974 log_watch
= watch_level
;
978 void librados::RadosClient::handle_log(MLog
*m
)
980 assert(lock
.is_locked());
981 ldout(cct
, 10) << __func__
<< " version " << m
->version
<< dendl
;
983 if (log_last_version
< m
->version
) {
984 log_last_version
= m
->version
;
986 if (log_cb
|| log_cb2
) {
987 for (std::deque
<LogEntry
>::iterator it
= m
->entries
.begin(); it
!= m
->entries
.end(); ++it
) {
990 ss
<< e
.stamp
<< " " << e
.name
<< " " << e
.prio
<< " " << e
.msg
;
991 string line
= ss
.str();
992 string who
= stringify(e
.who
);
993 string name
= stringify(e
.name
);
994 string level
= stringify(e
.prio
);
995 struct timespec stamp
;
996 e
.stamp
.to_timespec(&stamp
);
998 ldout(cct
, 20) << __func__
<< " delivering " << ss
.str() << dendl
;
1000 log_cb(log_cb_arg
, line
.c_str(), who
.c_str(),
1001 stamp
.tv_sec
, stamp
.tv_nsec
,
1002 e
.seq
, level
.c_str(), e
.msg
.c_str());
1004 log_cb2(log_cb_arg
, line
.c_str(), who
.c_str(), name
.c_str(),
1005 stamp
.tv_sec
, stamp
.tv_nsec
,
1006 e
.seq
, level
.c_str(), e
.msg
.c_str());
1010 monclient
.sub_got(log_watch
, log_last_version
);