#include "common/ceph_context.h"
#include "common/config.h"
#include "common/common_init.h"
+#include "common/ceph_json.h"
#include "common/errno.h"
+#include "common/ceph_json.h"
#include "include/buffer.h"
#include "include/stringify.h"
#include "include/util.h"
#include "PoolAsyncCompletionImpl.h"
#include "RadosClient.h"
-#include "include/assert.h"
+#include "include/ceph_assert.h"
#include "common/EventTrace.h"
#define dout_subsys ceph_subsys_rados
#define dout_prefix *_dout << "librados: "
bool librados::RadosClient::ms_get_authorizer(int dest_type,
- AuthAuthorizer **authorizer,
- bool force_new) {
+ AuthAuthorizer **authorizer) {
//ldout(cct, 0) << "RadosClient::ms_get_authorizer type=" << dest_type << dendl;
/* monitor authorization is being handled on different layer */
if (dest_type == CEPH_ENTITY_TYPE_MON)
});
}
-int librados::RadosClient::pool_get_auid(uint64_t pool_id,
- unsigned long long *auid)
-{
- int r = wait_for_osdmap();
- if (r < 0)
- return r;
- objecter->with_osdmap([&](const OSDMap& o) {
- const pg_pool_t *pg = o.get_pg_pool(pool_id);
- if (!pg) {
- r = -ENOENT;
- } else {
- r = 0;
- *auid = pg->auid;
- }
- });
- return r;
-}
-
-int librados::RadosClient::pool_get_name(uint64_t pool_id, std::string *s)
+int librados::RadosClient::pool_get_name(uint64_t pool_id, std::string *s, bool wait_latest_map)
{
int r = wait_for_osdmap();
if (r < 0)
return r;
+ retry:
objecter->with_osdmap([&](const OSDMap& o) {
if (!o.have_pg_pool(pool_id)) {
r = -ENOENT;
*s = o.get_pool_name(pool_id);
}
});
+ if (r == -ENOENT && wait_latest_map) {
+ r = wait_for_latest_osdmap();
+ if (r < 0)
+ return r;
+ wait_latest_map = false;
+ goto retry;
+ }
+
return r;
}
{
if (!s)
return -EINVAL;
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
ostringstream oss;
oss << monclient.get_fsid();
*s = oss.str();
int librados::RadosClient::connect()
{
- common_init_finish(cct);
-
int err;
// already connected?
return -EISCONN;
state = CONNECTING;
+ if (cct->_conf->log_early &&
+ !cct->_log->is_started()) {
+ cct->_log->start();
+ }
+
+ {
+ MonClient mc_bootstrap(cct);
+ err = mc_bootstrap.get_monmap_and_config();
+ if (err < 0)
+ return err;
+ }
+
+ common_init_finish(cct);
+
// get monmap
err = monclient.build_initial_monmap();
if (err < 0)
// how to decompose the reply data into its constituent pieces.
messenger->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));
- ldout(cct, 1) << "starting msgr at " << messenger->get_myaddr() << dendl;
+ ldout(cct, 1) << "starting msgr at " << messenger->get_myaddrs() << dendl;
ldout(cct, 1) << "starting objecter" << dendl;
}
messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
+ // Detect older cluster, put mgrclient into compatible mode
+ mgrclient.set_mgr_optional(
+ !get_required_monitor_features().contains_all(
+ ceph::features::mon::FEATURE_LUMINOUS));
+
// MgrClient needs this (it doesn't have MonClient reference itself)
monclient.sub_want("mgrmap", 0, 0);
monclient.renew_subs();
return instance_id;
}
+int librados::RadosClient::get_min_compatible_osd(int8_t* require_osd_release)
+{
+ int r = wait_for_osdmap();
+ if (r < 0) {
+ return r;
+ }
+
+ objecter->with_osdmap(
+ [require_osd_release](const OSDMap& o) {
+ *require_osd_release = o.require_osd_release;
+ });
+ return 0;
+}
+
+int librados::RadosClient::get_min_compatible_client(int8_t* min_compat_client,
+ int8_t* require_min_compat_client)
+{
+ int r = wait_for_osdmap();
+ if (r < 0) {
+ return r;
+ }
+
+ objecter->with_osdmap(
+ [min_compat_client, require_min_compat_client](const OSDMap& o) {
+ *min_compat_client = o.get_min_compat_client();
+ *require_min_compat_client = o.get_require_min_compat_client();
+ });
+ return 0;
+}
+
librados::RadosClient::~RadosClient()
{
if (messenger)
int librados::RadosClient::create_ioctx(int64_t pool_id, IoCtxImpl **io)
{
+ std::string pool_name;
+ int r = pool_get_name(pool_id, &pool_name, true);
+ if (r < 0)
+ return r;
*io = new librados::IoCtxImpl(this, objecter, pool_id, CEPH_NOSNAP);
return 0;
}
{
bool ret;
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
if (state == DISCONNECTED) {
ldout(cct, 10) << "disconnected, discarding " << *m << dendl;
m->put();
bool librados::RadosClient::_dispatch(Message *m)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
switch (m->get_type()) {
// OSD
case CEPH_MSG_OSD_MAP:
int librados::RadosClient::wait_for_osdmap()
{
- assert(!lock.is_locked_by_me());
+ ceph_assert(!lock.is_locked_by_me());
if (state != CONNECTED) {
return -ENOTCONN;
});
if (need_map) {
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
- utime_t timeout;
- if (cct->_conf->rados_mon_op_timeout > 0)
- timeout.set_from_double(cct->_conf->rados_mon_op_timeout);
+ ceph::timespan timeout{0};
+ if (cct->_conf->rados_mon_op_timeout > 0) {
+ timeout = ceph::make_timespan(cct->_conf->rados_mon_op_timeout);
+ }
if (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
ldout(cct, 10) << __func__ << " waiting" << dendl;
- utime_t start = ceph_clock_now();
while (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
- if (timeout.is_zero()) {
+ if (timeout == timeout.zero()) {
cond.Wait(lock);
} else {
- cond.WaitInterval(lock, timeout);
- utime_t elapsed = ceph_clock_now() - start;
- if (elapsed > timeout) {
+ int r = cond.WaitInterval(lock, timeout);
+ if (r == ETIMEDOUT) {
lderr(cct) << "timed out waiting for first osdmap from monitors"
<< dendl;
return -ETIMEDOUT;
}
void librados::RadosClient::get() {
- Mutex::Locker l(lock);
- assert(refcnt > 0);
+ std::lock_guard l(lock);
+ ceph_assert(refcnt > 0);
refcnt++;
}
bool librados::RadosClient::put() {
- Mutex::Locker l(lock);
- assert(refcnt > 0);
+ std::lock_guard l(lock);
+ ceph_assert(refcnt > 0);
refcnt--;
return (refcnt == 0);
}
-int librados::RadosClient::pool_create(string& name, unsigned long long auid,
+int librados::RadosClient::pool_create(string& name,
int16_t crush_rule)
{
+ if (!name.length())
+ return -EINVAL;
+
int r = wait_for_osdmap();
if (r < 0) {
return r;
Cond cond;
bool done;
Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
- reply = objecter->create_pool(name, onfinish, auid, crush_rule);
+ reply = objecter->create_pool(name, onfinish, crush_rule);
if (reply < 0) {
delete onfinish;
return reply;
}
-int librados::RadosClient::pool_create_async(string& name, PoolAsyncCompletionImpl *c,
- unsigned long long auid,
+int librados::RadosClient::pool_create_async(string& name,
+ PoolAsyncCompletionImpl *c,
int16_t crush_rule)
{
int r = wait_for_osdmap();
return r;
Context *onfinish = new C_PoolAsync_Safe(c);
- r = objecter->create_pool(name, onfinish, auid, crush_rule);
+ r = objecter->create_pool(name, onfinish, crush_rule);
if (r < 0) {
delete onfinish;
}
}
void librados::RadosClient::blacklist_self(bool set) {
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
objecter->blacklist_self(set);
}
const bufferlist &inbl,
bufferlist *outbl, string *outs)
{
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
C_SaferCond cond;
int r = mgrclient.start_command(cmd, inbl, outbl, outs, &cond);
return r;
lock.Unlock();
- r = cond.wait();
+ if (conf->rados_mon_op_timeout) {
+ r = cond.wait_for(conf->rados_mon_op_timeout);
+ } else {
+ r = cond.wait();
+ }
lock.Lock();
return r;
rados_log_callback2_t cb2,
void *arg)
{
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
if (state != CONNECTED) {
return -ENOTCONN;
void librados::RadosClient::handle_log(MLog *m)
{
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
ldout(cct, 10) << __func__ << " version " << m->version << dendl;
if (log_last_version < m->version) {
ostringstream ss;
ss << e.stamp << " " << e.name << " " << e.prio << " " << e.msg;
string line = ss.str();
- string who = stringify(e.who);
+ string who = stringify(e.rank) + " " + stringify(e.addrs);
string name = stringify(e.name);
string level = stringify(e.prio);
struct timespec stamp;
}
int librados::RadosClient::service_daemon_update_status(
- const std::map<std::string,std::string>& status)
+ std::map<std::string,std::string>&& status)
{
if (state != CONNECTED) {
return -ENOTCONN;
}
- return mgrclient.service_daemon_update_status(status);
+ return mgrclient.service_daemon_update_status(std::move(status));
}
mon_feature_t librados::RadosClient::get_required_monitor_features() const
{
- return monclient.monmap.get_required_features();
+ return monclient.with_monmap([](const MonMap &monmap) {
+ return monmap.get_required_features(); } );
+}
+
+int librados::RadosClient::get_inconsistent_pgs(int64_t pool_id,
+ std::vector<std::string>* pgs)
+{
+ vector<string> cmd = {
+ "{\"prefix\": \"pg ls\","
+ "\"pool\": " + std::to_string(pool_id) + ","
+ "\"states\": [\"inconsistent\"],"
+ "\"format\": \"json\"}"
+ };
+ bufferlist inbl, outbl;
+ string outstring;
+ if (auto ret = mgr_command(cmd, inbl, &outbl, &outstring); ret) {
+ return ret;
+ }
+ if (!outbl.length()) {
+ // no pg returned
+ return 0;
+ }
+ JSONParser parser;
+ if (!parser.parse(outbl.c_str(), outbl.length())) {
+ return -EINVAL;
+ }
+ vector<string> v;
+ if (!parser.is_array()) {
+ JSONObj *pgstat_obj = parser.find_obj("pg_stats");
+ if (!pgstat_obj)
+ return 0;
+ auto s = pgstat_obj->get_data();
+ JSONParser pg_stats;
+ if (!pg_stats.parse(s.c_str(), s.length())) {
+ return -EINVAL;
+ }
+ v = pg_stats.get_array_elements();
+ } else {
+ v = parser.get_array_elements();
+ }
+ for (auto i : v) {
+ JSONParser pg_json;
+ if (!pg_json.parse(i.c_str(), i.length())) {
+ return -EINVAL;
+ }
+ string pgid;
+ JSONDecoder::decode_json("pgid", pgid, &pg_json);
+ pgs->emplace_back(std::move(pgid));
+ }
+ return 0;
}