#include "osdc/Filer.h"
#include "common/Cond.h"
-#include "common/Mutex.h"
#include "common/perf_counters.h"
#include "common/admin_socket.h"
#include "common/errno.h"
{
}
-bool Client::CommandHook::call(std::string_view command,
- const cmdmap_t& cmdmap,
- std::string_view format, bufferlist& out)
+int Client::CommandHook::call(
+ std::string_view command,
+ const cmdmap_t& cmdmap,
+ Formatter *f,
+ std::ostream& errss,
+ bufferlist& out)
{
- std::unique_ptr<Formatter> f(Formatter::create(format));
f->open_object_section("result");
- m_client->client_lock.Lock();
- if (command == "mds_requests")
- m_client->dump_mds_requests(f.get());
- else if (command == "mds_sessions")
- m_client->dump_mds_sessions(f.get());
- else if (command == "dump_cache")
- m_client->dump_cache(f.get());
- else if (command == "kick_stale_sessions")
- m_client->_kick_stale_sessions();
- else if (command == "status")
- m_client->dump_status(f.get());
- else
- ceph_abort_msg("bad command registered");
- m_client->client_lock.Unlock();
+ {
+ std::lock_guard l{m_client->client_lock};
+ if (command == "mds_requests")
+ m_client->dump_mds_requests(f);
+ else if (command == "mds_sessions")
+ m_client->dump_mds_sessions(f);
+ else if (command == "dump_cache")
+ m_client->dump_cache(f);
+ else if (command == "kick_stale_sessions")
+ m_client->_kick_stale_sessions();
+ else if (command == "status")
+ m_client->dump_status(f);
+ else
+ ceph_abort_msg("bad command registered");
+ }
f->close_section();
- f->flush(out);
- return true;
+ return 0;
}
Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
: Dispatcher(m->cct),
timer(m->cct, client_lock),
- client_lock("Client::client_lock"),
messenger(m),
monclient(mc),
objecter(objecter_),
Client::~Client()
{
- ceph_assert(!client_lock.is_locked());
+ ceph_assert(ceph_mutex_is_not_locked(client_lock));
// It is necessary to hold client_lock, because any inode destruction
// may call into ObjectCacher, which asserts that it's lock (which is
// client_lock) is held.
- client_lock.Lock();
+ std::lock_guard l{client_lock};
tear_down_cache();
- client_lock.Unlock();
}
void Client::tear_down_cache()
void Client::dump_status(Formatter *f)
{
- ceph_assert(client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
ldout(cct, 1) << __func__ << dendl;
{
timer.init();
objectcacher->start();
-
- client_lock.Lock();
- ceph_assert(!initialized);
-
- messenger->add_dispatcher_tail(this);
- client_lock.Unlock();
-
+ {
+ std::lock_guard l{client_lock};
+ ceph_assert(!initialized);
+ messenger->add_dispatcher_tail(this);
+ }
_finish_init();
return 0;
}
void Client::_finish_init()
{
- client_lock.Lock();
- // logger
- PerfCountersBuilder plb(cct, "client", l_c_first, l_c_last);
- plb.add_time_avg(l_c_reply, "reply", "Latency of receiving a reply on metadata request");
- plb.add_time_avg(l_c_lat, "lat", "Latency of processing a metadata request");
- plb.add_time_avg(l_c_wrlat, "wrlat", "Latency of a file data write operation");
- plb.add_time_avg(l_c_read, "rdlat", "Latency of a file data read operation");
- plb.add_time_avg(l_c_fsync, "fsync", "Latency of a file sync operation");
- logger.reset(plb.create_perf_counters());
- cct->get_perfcounters_collection()->add(logger.get());
-
- client_lock.Unlock();
+ {
+ std::lock_guard l{client_lock};
+ // logger
+ PerfCountersBuilder plb(cct, "client", l_c_first, l_c_last);
+ plb.add_time_avg(l_c_reply, "reply", "Latency of receiving a reply on metadata request");
+ plb.add_time_avg(l_c_lat, "lat", "Latency of processing a metadata request");
+ plb.add_time_avg(l_c_wrlat, "wrlat", "Latency of a file data write operation");
+ plb.add_time_avg(l_c_read, "rdlat", "Latency of a file data read operation");
+ plb.add_time_avg(l_c_fsync, "fsync", "Latency of a file sync operation");
+ logger.reset(plb.create_perf_counters());
+ cct->get_perfcounters_collection()->add(logger.get());
+ }
cct->_conf.add_observer(this);
AdminSocket* admin_socket = cct->get_admin_socket();
int ret = admin_socket->register_command("mds_requests",
- "mds_requests",
&m_command_hook,
"show in-progress mds requests");
if (ret < 0) {
<< cpp_strerror(-ret) << dendl;
}
ret = admin_socket->register_command("mds_sessions",
- "mds_sessions",
&m_command_hook,
"show mds session state");
if (ret < 0) {
<< cpp_strerror(-ret) << dendl;
}
ret = admin_socket->register_command("dump_cache",
- "dump_cache",
&m_command_hook,
"show in-memory metadata cache contents");
if (ret < 0) {
<< cpp_strerror(-ret) << dendl;
}
ret = admin_socket->register_command("kick_stale_sessions",
- "kick_stale_sessions",
&m_command_hook,
"kick sessions that were remote reset");
if (ret < 0) {
<< cpp_strerror(-ret) << dendl;
}
ret = admin_socket->register_command("status",
- "status",
&m_command_hook,
"show overall client status");
if (ret < 0) {
<< cpp_strerror(-ret) << dendl;
}
- client_lock.Lock();
+ std::lock_guard l{client_lock};
initialized = true;
- client_lock.Unlock();
}
void Client::shutdown()
// If we were not mounted, but were being used for sending
// MDS commands, we may have sessions that need closing.
- client_lock.Lock();
- _close_sessions();
- client_lock.Unlock();
-
+ {
+ std::lock_guard l{client_lock};
+ _close_sessions();
+ }
cct->_conf.remove_observer(this);
cct->get_admin_socket()->unregister_commands(&m_command_hook);
}
objectcacher->stop(); // outside of client_lock! this does a join.
-
- client_lock.Lock();
- ceph_assert(initialized);
- initialized = false;
- timer.shutdown();
- client_lock.Unlock();
-
+ {
+ std::lock_guard l{client_lock};
+ ceph_assert(initialized);
+ initialized = false;
+ timer.shutdown();
+ }
objecter_finisher.wait_for_empty();
objecter_finisher.stop();
ceph_assert(dn);
- if (dlease->mask & CEPH_LOCK_DN) {
+ if (dlease->mask & CEPH_LEASE_VALID) {
if (dttl > dn->lease_ttl) {
ldout(cct, 10) << "got dentry lease on " << dn->name
<< " dur " << dlease->duration_ms << "ms ttl " << dttl << dendl;
}
}
-int Client::verify_reply_trace(int r,
+int Client::verify_reply_trace(int r, MetaSession *session,
MetaRequest *request, const MConstRef<MClientReply>& reply,
InodeRef *ptarget, bool *pcreated,
const UserPerm& perms)
extra_bl = reply->get_extra_bl();
if (extra_bl.length() >= 8) {
- // if the extra bufferlist has a buffer, we assume its the created inode
- // and that this request to create succeeded in actually creating
- // the inode (won the race with other create requests)
- decode(created_ino, extra_bl);
- got_created_ino = true;
+ if (session->mds_features.test(CEPHFS_FEATURE_DELEG_INO)) {
+ struct openc_response_t ocres;
+
+ decode(ocres, extra_bl);
+ created_ino = ocres.created_ino;
+ /*
+ * The userland cephfs client doesn't have a way to do an async create
+ * (yet), so just discard delegated_inos for now. Eventually we should
+ * store them and use them in create calls, even if they are synchronous,
+ * if only for testing purposes.
+ */
+ ldout(cct, 10) << "delegated_inos: " << ocres.delegated_inos << dendl;
+ } else {
+ // u64 containing number of created ino
+ decode(created_ino, extra_bl);
+ }
ldout(cct, 10) << "make_request created ino " << created_ino << dendl;
+ got_created_ino = true;
}
if (pcreated)
if (use_mds >= 0)
request->resend_mds = use_mds;
+ MetaSession *session = NULL;
while (1) {
if (request->aborted())
break;
}
// set up wait cond
- Cond caller_cond;
+ ceph::condition_variable caller_cond;
request->caller_cond = &caller_cond;
// choose mds
}
// open a session?
- MetaSession *session = NULL;
if (!have_open_session(mds)) {
session = _get_or_open_mds_session(mds);
// wait for signal
ldout(cct, 20) << "awaiting reply|forward|kick on " << &caller_cond << dendl;
request->kick = false;
- while (!request->reply && // reply
- request->resend_mds < 0 && // forward
- !request->kick)
- caller_cond.Wait(client_lock);
- request->caller_cond = NULL;
+ std::unique_lock l{client_lock, std::adopt_lock};
+ caller_cond.wait(l, [request] {
+ return (request->reply || // reply
+ request->resend_mds >= 0 || // forward
+ request->kick);
+ });
+ l.release();
+ request->caller_cond = nullptr;
// did we get a reply?
if (request->reply)
// kick dispatcher (we've got it!)
ceph_assert(request->dispatch_cond);
- request->dispatch_cond->Signal();
+ request->dispatch_cond->notify_all();
ldout(cct, 20) << "sendrecv kickback on tid " << tid << " " << request->dispatch_cond << dendl;
request->dispatch_cond = 0;
if (r >= 0 && ptarget)
- r = verify_reply_trace(r, request, reply, ptarget, pcreated, perms);
+ r = verify_reply_trace(r, session, request, reply, ptarget, pcreated, perms);
if (pdirbl)
*pdirbl = reply->get_extra_bl();
}
}
- auto m = MClientSession::create(CEPH_SESSION_REQUEST_OPEN);
+ auto m = make_message<MClientSession>(CEPH_SESSION_REQUEST_OPEN);
m->metadata = metadata;
m->supported_features = feature_bitset_t(CEPHFS_FEATURES_CLIENT_SUPPORTED);
session->con->send_message2(std::move(m));
{
ldout(cct, 2) << __func__ << " mds." << s->mds_num << " seq " << s->seq << dendl;
s->state = MetaSession::STATE_CLOSING;
- s->con->send_message2(MClientSession::create(CEPH_SESSION_REQUEST_CLOSE, s->seq));
+ s->con->send_message2(make_message<MClientSession>(CEPH_SESSION_REQUEST_CLOSE, s->seq));
}
void Client::_closed_mds_session(MetaSession *s)
s->state = MetaSession::STATE_CLOSED;
s->con->mark_down();
signal_context_list(s->waiting_for_open);
- mount_cond.Signal();
+ mount_cond.notify_all();
remove_session_caps(s);
kick_requests_closed(s);
mds_sessions.erase(s->mds_num);
renew_caps(session);
session->state = MetaSession::STATE_OPEN;
if (unmounting)
- mount_cond.Signal();
+ mount_cond.notify_all();
else
connect_mds_targets(from);
signal_context_list(session->waiting_for_open);
if (auto& m = session->release; m) {
session->con->send_message2(std::move(m));
}
- session->con->send_message2(MClientSession::create(CEPH_SESSION_FLUSHMSG_ACK, m->get_seq()));
+ session->con->send_message2(make_message<MClientSession>(CEPH_SESSION_FLUSHMSG_ACK, m->get_seq()));
break;
case CEPH_SESSION_FORCE_RO:
bool Client::_any_stale_sessions() const
{
- ceph_assert(client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
for (const auto &p : mds_sessions) {
if (p.second.state == MetaSession::STATE_STALE) {
session->con->send_message2(std::move(r));
}
-MClientRequest::ref Client::build_client_request(MetaRequest *request)
+ref_t<MClientRequest> Client::build_client_request(MetaRequest *request)
{
- auto req = MClientRequest::create(request->get_op());
+ auto req = make_message<MClientRequest>(request->get_op());
req->set_tid(request->tid);
req->set_stamp(request->op_stamp);
memcpy(&req->head, &request->head, sizeof(ceph_mds_request_head));
request->item.remove_myself();
request->num_fwd = fwd->get_num_fwd();
request->resend_mds = fwd->get_dest_mds();
- request->caller_cond->Signal();
+ request->caller_cond->notify_all();
}
bool Client::is_dir_operation(MetaRequest *req)
request->sent_on_mseq == it->second.mseq)) {
ldout(cct, 20) << "have to return ESTALE" << dendl;
} else {
- request->caller_cond->Signal();
+ request->caller_cond->notify_all();
return;
}
}
// Only signal the caller once (on the first reply):
// Either its an unsafe reply, or its a safe reply and no unsafe reply was sent.
if (!is_safe || !request->got_unsafe) {
- Cond cond;
+ ceph::condition_variable cond;
request->dispatch_cond = &cond;
// wake up waiter
ldout(cct, 20) << __func__ << " signalling caller " << (void*)request->caller_cond << dendl;
- request->caller_cond->Signal();
+ request->caller_cond->notify_all();
// wake for kick back
- while (request->dispatch_cond) {
- ldout(cct, 20) << __func__ << " awaiting kickback on tid " << tid << " " << &cond << dendl;
- cond.Wait(client_lock);
- }
+ std::unique_lock l{client_lock, std::adopt_lock};
+ cond.wait(l, [tid, request, &cond, this] {
+ if (request->dispatch_cond) {
+ ldout(cct, 20) << "handle_client_reply awaiting kickback on tid "
+ << tid << " " << &cond << dendl;
+ }
+ return !request->dispatch_cond;
+ });
+ l.release();
}
if (is_safe) {
unregister_request(request);
}
if (unmounting)
- mount_cond.Signal();
+ mount_cond.notify_all();
}
void Client::_handle_full_flag(int64_t pool)
bool new_blacklist = false;
bool prenautilus = objecter->with_osdmap(
[&](const OSDMap& o) {
- return o.require_osd_release < CEPH_RELEASE_NAUTILUS;
+ return o.require_osd_release < ceph_release_t::nautilus;
});
if (!blacklisted) {
for (auto a : myaddrs.v) {
switch (m->get_type()) {
// mounting and mds sessions
case CEPH_MSG_MDS_MAP:
- handle_mds_map(MMDSMap::msgref_cast(m));
+ handle_mds_map(ref_cast<MMDSMap>(m));
break;
case CEPH_MSG_FS_MAP:
- handle_fs_map(MFSMap::msgref_cast(m));
+ handle_fs_map(ref_cast<MFSMap>(m));
break;
case CEPH_MSG_FS_MAP_USER:
- handle_fs_map_user(MFSMapUser::msgref_cast(m));
+ handle_fs_map_user(ref_cast<MFSMapUser>(m));
break;
case CEPH_MSG_CLIENT_SESSION:
- handle_client_session(MClientSession::msgref_cast(m));
+ handle_client_session(ref_cast<MClientSession>(m));
break;
case CEPH_MSG_OSD_MAP:
- handle_osd_map(MOSDMap::msgref_cast(m));
+ handle_osd_map(ref_cast<MOSDMap>(m));
break;
// requests
case CEPH_MSG_CLIENT_REQUEST_FORWARD:
- handle_client_request_forward(MClientRequestForward::msgref_cast(m));
+ handle_client_request_forward(ref_cast<MClientRequestForward>(m));
break;
case CEPH_MSG_CLIENT_REPLY:
- handle_client_reply(MClientReply::msgref_cast(m));
+ handle_client_reply(ref_cast<MClientReply>(m));
break;
// reclaim reply
case CEPH_MSG_CLIENT_RECLAIM_REPLY:
- handle_client_reclaim_reply(MClientReclaimReply::msgref_cast(m));
+ handle_client_reclaim_reply(ref_cast<MClientReclaimReply>(m));
break;
case CEPH_MSG_CLIENT_SNAP:
- handle_snap(MClientSnap::msgref_cast(m));
+ handle_snap(ref_cast<MClientSnap>(m));
break;
case CEPH_MSG_CLIENT_CAPS:
- handle_caps(MClientCaps::msgref_cast(m));
+ handle_caps(ref_cast<MClientCaps>(m));
break;
case CEPH_MSG_CLIENT_LEASE:
- handle_lease(MClientLease::msgref_cast(m));
+ handle_lease(ref_cast<MClientLease>(m));
break;
case MSG_COMMAND_REPLY:
if (m->get_source().type() == CEPH_ENTITY_TYPE_MDS) {
- handle_command_reply(MCommandReply::msgref_cast(m));
+ handle_command_reply(ref_cast<MCommandReply>(m));
} else {
return false;
}
break;
case CEPH_MSG_CLIENT_QUOTA:
- handle_quota(MClientQuota::msgref_cast(m));
+ handle_quota(ref_cast<MClientQuota>(m));
break;
default:
trim_cache();
if (size < lru.lru_get_size() + inode_map.size()) {
ldout(cct, 10) << "unmounting: trim pass, cache shrank, poking unmount()" << dendl;
- mount_cond.Signal();
+ mount_cond.notify_all();
} else {
ldout(cct, 10) << "unmounting: trim pass, size still " << lru.lru_get_size()
<< "+" << inode_map.size() << dendl;
early_kick_flushing_caps(session);
- auto m = MClientReconnect::create();
+ auto m = make_message<MClientReconnect>();
bool allow_multi = session->mds_features.test(CEPHFS_FEATURE_MULTI_RECONNECT);
// i have an open session.
auto it = in->caps.find(mds);
if (it != in->caps.end()) {
if (allow_multi &&
- m->get_approx_size() >= (std::numeric_limits<int>::max() >> 1)) {
+ m->get_approx_size() >=
+ static_cast<size_t>((std::numeric_limits<int>::max() >> 1))) {
m->mark_more();
session->con->send_message2(std::move(m));
- m = MClientReconnect::create();
+ m = make_message<MClientReconnect>();
}
Cap &cap = it->second;
m->set_encoding_version(0); // use connection features to choose encoding
session->con->send_message2(std::move(m));
- mount_cond.Signal();
+ mount_cond.notify_all();
if (session->reclaim_state == MetaSession::RECLAIMING)
signal_cond_list(waiting_for_reclaim);
if (req->aborted()) {
if (req->caller_cond) {
req->kick = true;
- req->caller_cond->Signal();
+ req->caller_cond->notify_all();
}
continue;
}
if (req->mds == session->mds_num) {
if (req->caller_cond) {
req->kick = true;
- req->caller_cond->Signal();
+ req->caller_cond->notify_all();
}
req->item.remove_myself();
if (req->got_unsafe) {
s->seq++;
ldout(cct, 10) << " mds." << s->mds_num << " seq now " << s->seq << dendl;
if (s->state == MetaSession::STATE_CLOSING) {
- s->con->send_message2(MClientSession::create(CEPH_SESSION_REQUEST_CLOSE, s->seq));
+ s->con->send_message2(make_message<MClientSession>(CEPH_SESSION_REQUEST_CLOSE, s->seq));
}
}
}
in = inode_map[vino];
- if (m->get_mask() & CEPH_LOCK_DN) {
+ if (m->get_mask() & CEPH_LEASE_VALID) {
if (!in->dir || in->dir->dentries.count(m->dname) == 0) {
ldout(cct, 10) << " don't have dir|dentry " << m->get_ino() << "/" << m->dname <<dendl;
goto revoke;
revoke:
{
- auto reply = MClientLease::create(CEPH_MDS_LEASE_RELEASE, seq, m->get_mask(), m->get_ino(), m->get_first(), m->get_last(), m->dname);
+ auto reply = make_message<MClientLease>(CEPH_MDS_LEASE_RELEASE, seq,
+ m->get_mask(), m->get_ino(),
+ m->get_first(), m->get_last(), m->dname);
m->get_connection()->send_message2(std::move(reply));
}
}
public:
C_Client_FlushComplete(Client *c, Inode *in) : client(c), inode(in) { }
void finish(int r) override {
- ceph_assert(client->client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client->client_lock));
if (r != 0) {
client_t const whoami = client->whoami; // For the benefit of ldout prefix
ldout(client->cct, 1) << "I/O error from flush on inode " << inode
if (flush)
follows = in->snaprealm->get_snap_context().seq;
- auto m = MClientCaps::create(op,
+ auto m = make_message<MClientCaps>(op,
in->ino,
0,
cap->cap_id, cap->seq,
void Client::send_flush_snap(Inode *in, MetaSession *session,
snapid_t follows, CapSnap& capsnap)
{
- auto m = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP,
- in->ino, in->snaprealm->ino, 0,
- in->auth_cap->mseq, cap_epoch_barrier);
+ auto m = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP,
+ in->ino, in->snaprealm->ino, 0,
+ in->auth_cap->mseq, cap_epoch_barrier);
m->caller_uid = capsnap.cap_dirtier_uid;
m->caller_gid = capsnap.cap_dirtier_gid;
}
}
-void Client::wait_on_list(list<Cond*>& ls)
+void Client::wait_on_list(list<ceph::condition_variable*>& ls)
{
- Cond cond;
+ ceph::condition_variable cond;
ls.push_back(&cond);
- cond.Wait(client_lock);
+ std::unique_lock l{client_lock, std::adopt_lock};
+ cond.wait(l);
+ l.release();
ls.remove(&cond);
}
-void Client::signal_cond_list(list<Cond*>& ls)
+void Client::signal_cond_list(list<ceph::condition_variable*>& ls)
{
- for (list<Cond*>::iterator it = ls.begin(); it != ls.end(); ++it)
- (*it)->Signal();
+ for (auto cond : ls) {
+ cond->notify_all();
+ }
}
void Client::wait_on_context_list(list<Context*>& ls)
{
- Cond cond;
+ ceph::condition_variable cond;
bool done = false;
int r;
- ls.push_back(new C_Cond(&cond, &done, &r));
- while (!done)
- cond.Wait(client_lock);
+ ls.push_back(new C_Cond(cond, &done, &r));
+ std::unique_lock l{client_lock, std::adopt_lock};
+ cond.wait(l, [&done] { return done;});
+ l.release();
}
void Client::signal_context_list(list<Context*>& ls)
}
void finish(int r) override {
// _async_invalidate takes the lock when it needs to, call this back from outside of lock.
- ceph_assert(!client->client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_not_locked_by_me(client->client_lock));
client->_async_invalidate(ino, offset, length);
}
};
void Client::_flush_range(Inode *in, int64_t offset, uint64_t size)
{
- ceph_assert(client_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(client_lock));
if (!in->oset.dirty_or_tx) {
ldout(cct, 10) << " nothing to flush" << dendl;
return;
offset, size, &onflush);
if (!ret) {
// wait for flush
- client_lock.Unlock();
+ client_lock.unlock();
onflush.wait();
- client_lock.Lock();
+ client_lock.lock();
}
}
void Client::flush_set_callback(ObjectCacher::ObjectSet *oset)
{
// std::lock_guard l(client_lock);
- ceph_assert(client_lock.is_locked()); // will be called via dispatch() -> objecter -> ...
+ ceph_assert(ceph_mutex_is_locked(client_lock)); // will be called via dispatch() -> objecter -> ...
Inode *in = static_cast<Inode *>(oset->parent);
ceph_assert(in);
_flushed(in);
signal_cond_list(in->waitfor_caps);
}
s->flushing_caps_tids.clear();
- sync_cond.Signal();
+ sync_cond.notify_all();
}
int Client::_do_remount(bool retry_on_error)
if (oldest_tid <= want) {
ldout(cct, 10) << " waiting on mds." << p.first << " tid " << oldest_tid
<< " (want " << want << ")" << dendl;
- sync_cond.Wait(client_lock);
+ std::unique_lock l{client_lock, std::adopt_lock};
+ sync_cond.wait(l);
+ l.release();
goto retry;
}
}
signal_cond_list(in->waitfor_caps);
if (session->flushing_caps_tids.empty() ||
*session->flushing_caps_tids.begin() > flush_ack_tid)
- sync_cond.Signal();
+ sync_cond.notify_all();
}
if (!dirty) {
signal_cond_list(in->waitfor_caps);
if (session->flushing_caps_tids.empty() ||
*session->flushing_caps_tids.begin() > flush_ack_tid)
- sync_cond.Signal();
+ sync_cond.notify_all();
}
} else {
ldout(cct, 5) << __func__ << " DUP(?) mds." << mds << " flushed snap follows " << follows
}
void finish(int r) override {
// _async_dentry_invalidate is responsible for its own locking
- ceph_assert(!client->client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_not_locked_by_me(client->client_lock));
client->_async_dentry_invalidate(dirino, ino, name);
}
};
*/
int Client::authenticate()
{
- ceph_assert(client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
if (monclient->is_authenticated()) {
return 0;
}
- client_lock.Unlock();
+ client_lock.unlock();
int r = monclient->authenticate(cct->_conf->client_mount_timeout);
- client_lock.Lock();
+ client_lock.lock();
if (r < 0) {
return r;
}
do {
C_SaferCond cond;
monclient->get_version("fsmap", &fsmap_latest, NULL, &cond);
- client_lock.Unlock();
+ client_lock.unlock();
r = cond.wait();
- client_lock.Lock();
+ client_lock.lock();
} while (r == -EAGAIN);
if (r < 0) {
std::string resolved_fs_name;
if (fs_name.empty()) {
- resolved_fs_name = cct->_conf.get_val<std::string>("client_mds_namespace");
+ resolved_fs_name = cct->_conf.get_val<std::string>("client_fs");
+ if (resolved_fs_name.empty())
+ // Try the backwards compatibility fs name option
+ resolved_fs_name = cct->_conf.get_val<std::string>("client_mds_namespace");
} else {
resolved_fs_name = fs_name;
}
// wait for sessions to close
ldout(cct, 2) << "waiting for " << mds_sessions.size() << " mds sessions to close" << dendl;
- mount_cond.Wait(client_lock);
+ std::unique_lock l{client_lock, std::adopt_lock};
+ mount_cond.wait(l);
+ l.release();
}
}
// will crash if they see an unknown CEPH_SESSION_* value in this msg.
const uint64_t features = session->con->get_features();
if (HAVE_FEATURE(features, SERVER_LUMINOUS)) {
- auto m = MClientSession::create(CEPH_SESSION_REQUEST_FLUSH_MDLOG);
+ auto m = make_message<MClientSession>(CEPH_SESSION_REQUEST_FLUSH_MDLOG);
session->con->send_message2(std::move(m));
}
}
req->abort(err);
if (req->caller_cond) {
req->kick = true;
- req->caller_cond->Signal();
+ req->caller_cond->notify_all();
}
}
void Client::_unmount(bool abort)
{
+ std::unique_lock lock{client_lock, std::adopt_lock};
if (unmounting)
return;
flush_mdlog_sync();
}
- while (!mds_requests.empty()) {
- ldout(cct, 10) << "waiting on " << mds_requests.size() << " requests" << dendl;
- mount_cond.Wait(client_lock);
- }
-
+ mount_cond.wait(lock, [this] {
+ if (!mds_requests.empty()) {
+ ldout(cct, 10) << "waiting on " << mds_requests.size() << " requests"
+ << dendl;
+ }
+ return mds_requests.empty();
+ });
if (tick_event)
timer.cancel_event(tick_event);
tick_event = 0;
_ll_drop_pins();
- while (unsafe_sync_write > 0) {
- ldout(cct, 0) << unsafe_sync_write << " unsafe_sync_writes, waiting" << dendl;
- mount_cond.Wait(client_lock);
- }
+ mount_cond.wait(lock, [this] {
+ if (unsafe_sync_write > 0) {
+ ldout(cct, 0) << unsafe_sync_write << " unsafe_sync_writes, waiting"
+ << dendl;
+ }
+ return unsafe_sync_write <= 0;
+ });
if (cct->_conf->client_oc) {
// flush/release all buffered data
<< "+" << inode_map.size() << " items"
<< ", waiting (for caps to release?)"
<< dendl;
- utime_t until = ceph_clock_now() + utime_t(5, 0);
- int r = mount_cond.WaitUntil(client_lock, until);
- if (r == ETIMEDOUT) {
+ if (auto r = mount_cond.wait_for(lock, ceph::make_timespan(5));
+ r == std::cv_status::timeout) {
dump_cache(NULL);
}
}
mounted = false;
+ lock.release();
ldout(cct, 2) << "unmounted." << dendl;
}
ldout(cct, 21) << "tick" << dendl;
tick_event = timer.add_event_after(
cct->_conf->client_tick_interval,
- new FunctionContext([this](int) {
+ new LambdaContext([this](int) {
// Called back via Timer, which takes client_lock for us
- ceph_assert(client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
tick();
}));
utime_t now = ceph_clock_now();
req->abort(-ETIMEDOUT);
if (req->caller_cond) {
req->kick = true;
- req->caller_cond->Signal();
+ req->caller_cond->notify_all();
}
signal_cond_list(waiting_for_mdsmap);
for (auto &p : mds_sessions) {
ldout(cct, 10) << "renew_caps mds." << session->mds_num << dendl;
session->last_cap_renew_request = ceph_clock_now();
uint64_t seq = ++session->cap_renew_seq;
- session->con->send_message2(MClientSession::create(CEPH_SESSION_REQUEST_RENEWCAPS, seq));
+ session->con->send_message2(make_message<MClientSession>(CEPH_SESSION_REQUEST_RENEWCAPS, seq));
}
int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
int caps, bool getref)
{
- ceph_assert(client_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(client_lock));
ldout(cct, 10) << __func__ << " " << dirp << " on " << dirp->inode->ino
<< " last_name " << dirp->last_name << " offset " << hex << dirp->offset << dec
<< dendl;
dn_name = dn->name; // fill in name while we have lock
- client_lock.Unlock();
+ client_lock.unlock();
r = cb(p, &de, &stx, next_off, in); // _next_ offset
- client_lock.Lock();
+ client_lock.lock();
ldout(cct, 15) << " de " << de.d_name << " off " << hex << dn->offset << dec
<< " = " << r << dendl;
if (r < 0) {
_ll_get(inode);
}
- client_lock.Unlock();
+ client_lock.unlock();
r = cb(p, &de, &stx, next_off, inode);
- client_lock.Lock();
+ client_lock.lock();
if (r < 0)
return r;
_ll_get(inode);
}
- client_lock.Unlock();
+ client_lock.unlock();
r = cb(p, &de, &stx, next_off, inode);
- client_lock.Lock();
+ client_lock.lock();
if (r < 0)
return r;
_ll_get(inode);
}
- client_lock.Unlock();
+ client_lock.unlock();
r = cb(p, &de, &stx, next_off, inode); // _next_ offset
- client_lock.Lock();
+ client_lock.lock();
ldout(cct, 15) << " de " << de.d_name << " off " << hex << next_off - 1 << dec
<< " = " << r << dendl;
loff_t Client::_lseek(Fh *f, loff_t offset, int whence)
{
Inode *in = f->inode.get();
- int r;
+ bool whence_check = false;
loff_t pos = -1;
- if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
- r = _getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms);
- if (r < 0) {
+ switch (whence) {
+ case SEEK_END:
+ whence_check = true;
+ break;
+
+#ifdef SEEK_DATA
+ case SEEK_DATA:
+ whence_check = true;
+ break;
+#endif
+
+#ifdef SEEK_HOLE
+ case SEEK_HOLE:
+ whence_check = true;
+ break;
+#endif
+ }
+
+ if (whence_check) {
+ int r = _getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms);
+ if (r < 0)
return r;
- }
}
switch (whence) {
pos = in->size + offset;
break;
+#ifdef SEEK_DATA
case SEEK_DATA:
- if (offset < 0 || offset >= in->size) {
- r = -ENXIO;
- return offset;
- }
+ if (offset < 0 || static_cast<uint64_t>(offset) >= in->size)
+ return -ENXIO;
pos = offset;
break;
+#endif
+#ifdef SEEK_HOLE
case SEEK_HOLE:
- if (offset < 0 || offset >= in->size) {
- r = -ENXIO;
- pos = offset;
- } else {
- pos = in->size;
- }
+ if (offset < 0 || static_cast<uint64_t>(offset) >= in->size)
+ return -ENXIO;
+ pos = in->size;
break;
+#endif
default:
ldout(cct, 1) << __func__ << ": invalid whence value " << whence << dendl;
ldout(cct, 10) << __func__ << " " << f << dendl;
if (f->pos_locked || !f->pos_waiters.empty()) {
- Cond cond;
+ ceph::condition_variable cond;
f->pos_waiters.push_back(&cond);
ldout(cct, 10) << __func__ << " BLOCKING on " << f << dendl;
- while (f->pos_locked || f->pos_waiters.front() != &cond)
- cond.Wait(client_lock);
+ std::unique_lock l{client_lock, std::adopt_lock};
+ cond.wait(l, [f, me=&cond] {
+ return !f->pos_locked && f->pos_waiters.front() == me;
+ });
+ l.release();
ldout(cct, 10) << __func__ << " UNBLOCKING on " << f << dendl;
ceph_assert(f->pos_waiters.front() == &cond);
f->pos_waiters.pop_front();
int r = _read(f, offset, size, &bl);
ldout(cct, 3) << "read(" << fd << ", " << (void*)buf << ", " << size << ", " << offset << ") = " << r << dendl;
if (r >= 0) {
- bl.copy(0, bl.length(), buf);
+ bl.begin().copy(bl.length(), buf);
r = bl.length();
}
return r;
// done!
if (onuninline) {
- client_lock.Unlock();
+ client_lock.unlock();
int ret = onuninline->wait();
- client_lock.Lock();
+ client_lock.lock();
if (ret >= 0 || ret == -ECANCELED) {
in->inline_data.clear();
in->inline_version = CEPH_INLINE_NONE;
off, len, bl, 0, &onfinish);
if (r == 0) {
get_cap_ref(in, CEPH_CAP_FILE_CACHE);
- client_lock.Unlock();
+ client_lock.unlock();
r = onfinish.wait();
- client_lock.Lock();
+ client_lock.lock();
put_cap_ref(in, CEPH_CAP_FILE_CACHE);
}
ldout(cct, 10) << __func__ << " " << *in << " " << off << "~" << len << dendl;
- Mutex flock("Client::_read_sync flock");
- Cond cond;
while (left > 0) {
C_SaferCond onfinish("Client::_read_sync flock");
bufferlist tbl;
pos, left, &tbl, 0,
in->truncate_size, in->truncate_seq,
&onfinish);
- client_lock.Unlock();
+ client_lock.unlock();
int r = onfinish.wait();
- client_lock.Lock();
+ client_lock.lock();
// if we get ENOENT from OSD, assume 0 bytes returned
if (r == -ENOENT)
ldout(cct, 15) << __func__ << " unsafe_sync_write = " << unsafe_sync_write << dendl;
if (unsafe_sync_write == 0 && unmounting) {
ldout(cct, 10) << __func__ << " -- no more unsafe writes, unmount can proceed" << dendl;
- mount_cond.Signal();
+ mount_cond.notify_all();
}
}
if (r <= 0)
return r;
- int bufoff = 0;
+ auto iter = bl.cbegin();
for (unsigned j = 0, resid = r; j < iovcnt && resid > 0; j++) {
/*
* This piece of code aims to handle the case that bufferlist does not have enough data
* to fill in the iov
*/
- if (resid < iov[j].iov_len) {
- bl.copy(bufoff, resid, (char *)iov[j].iov_base);
- break;
- } else {
- bl.copy(bufoff, iov[j].iov_len, (char *)iov[j].iov_base);
- }
- resid -= iov[j].iov_len;
- bufoff += iov[j].iov_len;
+ const auto round_size = std::min<unsigned>(resid, iov[j].iov_len);
+ iter.copy(round_size, reinterpret_cast<char*>(iov[j].iov_base));
+ resid -= round_size;
+ /* iter is self-updating */
}
return r;
}
* change out from under us.
*/
if (f->flags & O_APPEND) {
- int r = _lseek(f, 0, SEEK_END);
+ auto r = _lseek(f, 0, SEEK_END);
if (r < 0) {
unlock_fh_pos(f);
return r;
uint32_t len = in->inline_data.length();
if (endoff < len)
- in->inline_data.copy(endoff, len - endoff, bl);
+ in->inline_data.begin(endoff).copy(len - endoff, bl); // XXX
if (offset < len)
in->inline_data.splice(offset, len - offset);
offset, size, bl, ceph::real_clock::now(), 0,
in->truncate_size, in->truncate_seq,
&onfinish);
- client_lock.Unlock();
+ client_lock.unlock();
onfinish.wait();
- client_lock.Lock();
+ client_lock.lock();
_sync_write_commit(in);
}
done:
if (nullptr != onuninline) {
- client_lock.Unlock();
+ client_lock.unlock();
int uninline_ret = onuninline->wait();
- client_lock.Lock();
+ client_lock.lock();
if (uninline_ret >= 0 || uninline_ret == -ECANCELED) {
in->inline_data.clear();
}
if (nullptr != object_cacher_completion) { // wait on a real reply instead of guessing
- client_lock.Unlock();
+ client_lock.unlock();
ldout(cct, 15) << "waiting on data to flush" << dendl;
r = object_cacher_completion->wait();
- client_lock.Lock();
+ client_lock.lock();
ldout(cct, 15) << "got " << r << " from flush writeback" << dendl;
} else {
// FIXME: this can starve
objecter->get_fs_stats(stats, boost::optional<int64_t>(), &cond);
}
- client_lock.Unlock();
+ client_lock.unlock();
int rval = cond.wait();
assert(root);
total_files_on_fs = root->rstat.rfiles + root->rstat.rsubdirs;
- client_lock.Lock();
+ client_lock.lock();
if (rval < 0) {
ldout(cct, 1) << "underlying call to statfs returned error: "
wait_sync_caps(flush_tid);
if (nullptr != cond) {
- client_lock.Unlock();
+ client_lock.unlock();
ldout(cct, 15) << __func__ << " waiting on data to flush" << dendl;
cond->wait();
ldout(cct, 15) << __func__ << " flush finished" << dendl;
- client_lock.Lock();
+ client_lock.lock();
}
return 0;
name += this_len;
size -= this_len;
}
-
- const VXattr *vxattr;
- for (vxattr = _get_vxattrs(in); vxattr && !vxattr->name.empty(); vxattr++) {
- if (vxattr->hidden)
- continue;
- // call pointer-to-member function
- if (vxattr->exists_cb && !(this->*(vxattr->exists_cb))(in))
- continue;
-
- size_t this_len = vxattr->name.length() + 1;
- r += this_len;
- if (len_only)
- continue;
-
- if (this_len > size) {
- r = -ERANGE;
- goto out;
- }
-
- memcpy(name, vxattr->name.c_str(), this_len);
- name += this_len;
- size -= this_len;
- }
out:
ldout(cct, 8) << __func__ << "(" << in->ino << ", " << size << ") = " << r << dendl;
return r;
name: CEPH_XATTR_NAME(_type, _name), \
getxattr_cb: &Client::_vxattrcb_ ## _type ## _ ## _name, \
readonly: true, \
- hidden: false, \
exists_cb: NULL, \
flags: 0, \
}
name: CEPH_XATTR_NAME(_type, _name), \
getxattr_cb: &Client::_vxattrcb_ ## _type ## _ ## _name, \
readonly: true, \
- hidden: false, \
exists_cb: NULL, \
flags: _flags, \
}
name: CEPH_XATTR_NAME2(_type, _name, _field), \
getxattr_cb: &Client::_vxattrcb_ ## _name ## _ ## _field, \
readonly: false, \
- hidden: true, \
exists_cb: &Client::_vxattrcb_layout_exists, \
flags: 0, \
}
name: CEPH_XATTR_NAME(_type, _name), \
getxattr_cb: &Client::_vxattrcb_ ## _type ## _ ## _name, \
readonly: false, \
- hidden: true, \
exists_cb: &Client::_vxattrcb_quota_exists, \
flags: 0, \
}
name: "ceph.dir.layout",
getxattr_cb: &Client::_vxattrcb_layout,
readonly: false,
- hidden: true,
exists_cb: &Client::_vxattrcb_layout_exists,
flags: 0,
},
name: "ceph.quota",
getxattr_cb: &Client::_vxattrcb_quota,
readonly: false,
- hidden: true,
exists_cb: &Client::_vxattrcb_quota_exists,
flags: 0,
},
name: "ceph.dir.pin",
getxattr_cb: &Client::_vxattrcb_dir_pin,
readonly: false,
- hidden: true,
exists_cb: &Client::_vxattrcb_dir_pin_exists,
flags: 0,
},
name: "ceph.snap.btime",
getxattr_cb: &Client::_vxattrcb_snap_btime,
readonly: true,
- hidden: false,
exists_cb: &Client::_vxattrcb_snap_btime_exists,
flags: 0,
},
name: "ceph.file.layout",
getxattr_cb: &Client::_vxattrcb_layout,
readonly: false,
- hidden: true,
exists_cb: &Client::_vxattrcb_layout_exists,
flags: 0,
},
name: "ceph.snap.btime",
getxattr_cb: &Client::_vxattrcb_snap_btime,
readonly: true,
- hidden: false,
exists_cb: &Client::_vxattrcb_snap_btime_exists,
flags: 0,
},
else
return -EROFS;
}
- if (fromdir != todir) {
- Inode *fromdir_root =
- fromdir->quota.is_enable() ? fromdir : get_quota_root(fromdir, perm);
- Inode *todir_root =
- todir->quota.is_enable() ? todir : get_quota_root(todir, perm);
- if (fromdir_root != todir_root) {
- return -EXDEV;
- }
- }
InodeRef target;
MetaRequest *req = new MetaRequest(op);
req->dentry_unless = CEPH_CAP_FILE_EXCL;
InodeRef oldin, otherin;
- res = _lookup(fromdir, fromname, 0, &oldin, perm);
+ Inode *fromdir_root = nullptr;
+ Inode *todir_root = nullptr;
+ int mask = 0;
+ bool quota_check = false;
+ if (fromdir != todir) {
+ fromdir_root =
+ fromdir->quota.is_enable() ? fromdir : get_quota_root(fromdir, perm);
+ todir_root =
+ todir->quota.is_enable() ? todir : get_quota_root(todir, perm);
+
+ if (todir_root->quota.is_enable() && fromdir_root != todir_root) {
+ // use CEPH_STAT_RSTAT mask to force send getattr or lookup request
+ // to auth MDS to get latest rstat for todir_root and source dir
+ // even if their dentry caches and inode caps are satisfied.
+ res = _getattr(todir_root, CEPH_STAT_RSTAT, perm, true);
+ if (res < 0)
+ goto fail;
+
+ quota_check = true;
+ if (oldde->inode && oldde->inode->is_dir()) {
+ mask |= CEPH_STAT_RSTAT;
+ }
+ }
+ }
+
+ res = _lookup(fromdir, fromname, mask, &oldin, perm);
if (res < 0)
goto fail;
req->set_old_inode(oldinode);
req->old_inode_drop = CEPH_CAP_LINK_SHARED;
+ if (quota_check) {
+ int64_t old_bytes, old_files;
+ if (oldinode->is_dir()) {
+ old_bytes = oldinode->rstat.rbytes;
+ old_files = oldinode->rstat.rsize();
+ } else {
+ old_bytes = oldinode->size;
+ old_files = 1;
+ }
+
+ bool quota_exceed = false;
+ if (todir_root && todir_root->quota.max_bytes &&
+ (old_bytes + todir_root->rstat.rbytes) >= todir_root->quota.max_bytes) {
+ ldout(cct, 10) << "_rename (" << oldinode->ino << " bytes="
+ << old_bytes << ") to (" << todir->ino
+ << ") will exceed quota on " << *todir_root << dendl;
+ quota_exceed = true;
+ }
+
+ if (todir_root && todir_root->quota.max_files &&
+ (old_files + todir_root->rstat.rsize()) >= todir_root->quota.max_files) {
+ ldout(cct, 10) << "_rename (" << oldinode->ino << " files="
+ << old_files << ") to (" << todir->ino
+ << ") will exceed quota on " << *todir_root << dendl;
+ quota_exceed = true;
+ }
+
+ if (quota_exceed) {
+ res = (oldinode->is_dir()) ? -EXDEV : -EDQUOT;
+ goto fail;
+ }
+ }
+
res = _lookup(todir, toname, 0, &otherin, perm);
switch (res) {
case 0:
CEPH_OSD_FLAG_READ,
&onfinish);
- client_lock.Unlock();
+ client_lock.unlock();
int r = onfinish.wait();
- client_lock.Lock();
+ client_lock.lock();
if (r >= 0) {
- bl.copy(0, bl.length(), buf);
+ bl.begin().copy(bl.length(), buf);
r = bl.length();
}
fakesnap.seq = snapseq;
/* lock just in time */
- client_lock.Lock();
+ client_lock.lock();
if (unmounting) {
- client_lock.Unlock();
+ client_lock.unlock();
return -ENOTCONN;
}
0,
onsafe.get());
- client_lock.Unlock();
+ client_lock.unlock();
if (nullptr != onsafe) {
r = onsafe->wait();
}
if (in->inline_version < CEPH_INLINE_NONE &&
(have & CEPH_CAP_FILE_BUFFER)) {
bufferlist bl;
+ auto inline_iter = in->inline_data.cbegin();
int len = in->inline_data.length();
if (offset < len) {
if (offset > 0)
- in->inline_data.copy(0, offset, bl);
+ inline_iter.copy(offset, bl);
int size = length;
if (offset + size > len)
size = len - offset;
if (size > 0)
bl.append_zero(size);
- if (offset + size < len)
- in->inline_data.copy(offset + size, len - offset - size, bl);
+ if (offset + size < len) {
+ inline_iter += size;
+ inline_iter.copy(len - offset - size, bl);
+ }
in->inline_data = bl;
in->inline_version++;
}
in->change_attr++;
in->mark_caps_dirty(CEPH_CAP_FILE_WR);
- client_lock.Unlock();
+ client_lock.unlock();
onfinish.wait();
- client_lock.Lock();
+ client_lock.lock();
_sync_write_commit(in);
}
} else if (!(mode & FALLOC_FL_KEEP_SIZE)) {
}
if (nullptr != onuninline) {
- client_lock.Unlock();
+ client_lock.unlock();
int ret = onuninline->wait();
- client_lock.Lock();
+ client_lock.lock();
if (ret >= 0 || ret == -ECANCELED) {
in->inline_data.clear();
return false;
}
-bool Client::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer)
-{
- if (dest_type == CEPH_ENTITY_TYPE_MON)
- return true;
- *authorizer = monclient->build_authorizer(dest_type);
- return true;
-}
-
Inode *Client::get_quota_root(Inode *in, const UserPerm& perms)
{
Inode *quota_in = root_ancestor;
bool Client::is_quota_bytes_approaching(Inode *in, const UserPerm& perms)
{
+ ceph_assert(in->size >= in->reported_size);
+ const uint64_t size = in->size - in->reported_size;
return check_quota_condition(in, perms,
- [](const Inode &in) {
+ [&size](const Inode &in) {
if (in.quota.max_bytes) {
if (in.rstat.rbytes >= in.quota.max_bytes) {
return true;
}
- ceph_assert(in.size >= in.reported_size);
const uint64_t space = in.quota.max_bytes - in.rstat.rbytes;
- const uint64_t size = in.size - in.reported_size;
return (space >> 4) < size;
} else {
return false;
objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), wr_op,
nullsnapc, ceph::real_clock::now(), 0, &wr_cond);
- client_lock.Unlock();
+ client_lock.unlock();
int rd_ret = rd_cond.wait();
int wr_ret = wr_cond.wait();
- client_lock.Lock();
+ client_lock.lock();
bool errored = false;
if (session->reclaim_state == MetaSession::RECLAIM_NULL ||
session->reclaim_state == MetaSession::RECLAIMING) {
session->reclaim_state = MetaSession::RECLAIMING;
- auto m = MClientReclaim::create(uuid, flags);
+ auto m = make_message<MClientReclaim>(uuid, flags);
session->con->send_message2(std::move(m));
wait_on_list(waiting_for_reclaim);
} else if (session->reclaim_state == MetaSession::RECLAIM_FAIL) {
C_SaferCond cond;
if (!objecter->wait_for_map(reclaim_osd_epoch, &cond)) {
ldout(cct, 10) << __func__ << ": waiting for OSD epoch " << reclaim_osd_epoch << dendl;
- client_lock.Unlock();
+ client_lock.unlock();
cond.wait();
- client_lock.Lock();
+ client_lock.lock();
}
bool blacklisted = objecter->with_osdmap(
for (auto &p : mds_sessions) {
p.second.reclaim_state = MetaSession::RECLAIM_NULL;
- auto m = MClientReclaim::create("", MClientReclaim::FLAG_FINISH);
+ auto m = make_message<MClientReclaim>("", MClientReclaim::FLAG_FINISH);
p.second.con->send_message2(std::move(m));
}
mds_rank_t Client::_get_random_up_mds() const
{
- ceph_assert(client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
std::set<mds_rank_t> up;
mdsmap->get_up_mds_set(up);
objectcacher->start();
objecter->init();
- client_lock.Lock();
+ client_lock.lock();
ceph_assert(!is_initialized());
messenger->add_dispatcher_tail(objecter);
if (r < 0) {
// need to do cleanup because we're in an intermediate init state
timer.shutdown();
- client_lock.Unlock();
+ client_lock.unlock();
objecter->shutdown();
objectcacher->stop();
monclient->shutdown();
}
objecter->start();
- client_lock.Unlock();
+ client_lock.unlock();
_finish_init();
return 0;