#include "osdc/Filer.h"
#include "common/Cond.h"
-#include "common/Mutex.h"
#include "common/perf_counters.h"
#include "common/admin_socket.h"
#include "common/errno.h"
#include "include/ceph_assert.h"
#include "include/stat.h"
-#include "include/cephfs/ceph_statx.h"
+#include "include/cephfs/ceph_ll_client.h"
#if HAVE_GETGROUPLIST
#include <grp.h>
#define DEBUG_GETATTR_CAPS (CEPH_CAP_XATTR_SHARED)
+using namespace TOPNSPC::common;
+
void client_flush_set_callback(void *p, ObjectCacher::ObjectSet *oset)
{
Client *client = static_cast<Client*>(p);
client->flush_set_callback(oset);
}
+bool Client::is_reserved_vino(vinodeno_t &vino) {
+ if (MDS_IS_PRIVATE_INO(vino.ino)) {
+ ldout(cct, -1) << __func__ << " attempt to access reserved inode number " << vino << dendl;
+ return true;
+ }
+ return false;
+}
+
// -------------
{
}
-bool Client::CommandHook::call(std::string_view command,
- const cmdmap_t& cmdmap,
- std::string_view format, bufferlist& out)
+int Client::CommandHook::call(
+ std::string_view command,
+ const cmdmap_t& cmdmap,
+ Formatter *f,
+ std::ostream& errss,
+ bufferlist& out)
{
- std::unique_ptr<Formatter> f(Formatter::create(format));
f->open_object_section("result");
- m_client->client_lock.Lock();
- if (command == "mds_requests")
- m_client->dump_mds_requests(f.get());
- else if (command == "mds_sessions")
- m_client->dump_mds_sessions(f.get());
- else if (command == "dump_cache")
- m_client->dump_cache(f.get());
- else if (command == "kick_stale_sessions")
- m_client->_kick_stale_sessions();
- else if (command == "status")
- m_client->dump_status(f.get());
- else
- ceph_abort_msg("bad command registered");
- m_client->client_lock.Unlock();
+ {
+ std::lock_guard l{m_client->client_lock};
+ if (command == "mds_requests")
+ m_client->dump_mds_requests(f);
+ else if (command == "mds_sessions") {
+ bool cap_dump = false;
+ cmd_getval(cmdmap, "cap_dump", cap_dump);
+ m_client->dump_mds_sessions(f, cap_dump);
+ } else if (command == "dump_cache")
+ m_client->dump_cache(f);
+ else if (command == "kick_stale_sessions")
+ m_client->_kick_stale_sessions();
+ else if (command == "status")
+ m_client->dump_status(f);
+ else
+ ceph_abort_msg("bad command registered");
+ }
f->close_section();
- f->flush(out);
- return true;
+ return 0;
}
Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
: Dispatcher(m->cct),
timer(m->cct, client_lock),
- client_lock("Client::client_lock"),
messenger(m),
monclient(mc),
objecter(objecter_),
async_dentry_invalidator(m->cct),
interrupt_finisher(m->cct),
remount_finisher(m->cct),
+ async_ino_releasor(m->cct),
objecter_finisher(m->cct),
m_command_hook(this),
fscid(0)
cct->_conf->client_oc_target_dirty,
cct->_conf->client_oc_max_dirty_age,
true));
- objecter_finisher.start();
- filer.reset(new Filer(objecter, &objecter_finisher));
- objecter->enable_blacklist_events();
}
Client::~Client()
{
- ceph_assert(!client_lock.is_locked());
+ ceph_assert(ceph_mutex_is_not_locked(client_lock));
// It is necessary to hold client_lock, because any inode destruction
// may call into ObjectCacher, which asserts that it's lock (which is
// client_lock) is held.
- client_lock.Lock();
+ std::lock_guard l{client_lock};
tear_down_cache();
- client_lock.Unlock();
}
void Client::tear_down_cache()
void Client::dump_status(Formatter *f)
{
- ceph_assert(client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
ldout(cct, 1) << __func__ << dendl;
f->dump_int("osd_epoch", osd_epoch);
f->dump_int("osd_epoch_barrier", cap_epoch_barrier);
f->dump_bool("blacklisted", blacklisted);
+ f->dump_string("fs_name", mdsmap->get_fs_name());
}
}
-int Client::init()
+void Client::_pre_init()
{
timer.init();
- objectcacher->start();
- client_lock.Lock();
- ceph_assert(!initialized);
+ objecter_finisher.start();
+ filer.reset(new Filer(objecter, &objecter_finisher));
+ objecter->enable_blacklist_events();
- messenger->add_dispatcher_tail(this);
- client_lock.Unlock();
+ objectcacher->start();
+}
+int Client::init()
+{
+ _pre_init();
+ {
+ std::lock_guard l{client_lock};
+ ceph_assert(!initialized);
+ messenger->add_dispatcher_tail(this);
+ }
_finish_init();
return 0;
}
void Client::_finish_init()
{
- client_lock.Lock();
- // logger
- PerfCountersBuilder plb(cct, "client", l_c_first, l_c_last);
- plb.add_time_avg(l_c_reply, "reply", "Latency of receiving a reply on metadata request");
- plb.add_time_avg(l_c_lat, "lat", "Latency of processing a metadata request");
- plb.add_time_avg(l_c_wrlat, "wrlat", "Latency of a file data write operation");
- plb.add_time_avg(l_c_read, "rdlat", "Latency of a file data read operation");
- plb.add_time_avg(l_c_fsync, "fsync", "Latency of a file sync operation");
- logger.reset(plb.create_perf_counters());
- cct->get_perfcounters_collection()->add(logger.get());
-
- client_lock.Unlock();
+ {
+ std::lock_guard l{client_lock};
+ // logger
+ PerfCountersBuilder plb(cct, "client", l_c_first, l_c_last);
+ plb.add_time_avg(l_c_reply, "reply", "Latency of receiving a reply on metadata request");
+ plb.add_time_avg(l_c_lat, "lat", "Latency of processing a metadata request");
+ plb.add_time_avg(l_c_wrlat, "wrlat", "Latency of a file data write operation");
+ plb.add_time_avg(l_c_read, "rdlat", "Latency of a file data read operation");
+ plb.add_time_avg(l_c_fsync, "fsync", "Latency of a file sync operation");
+ logger.reset(plb.create_perf_counters());
+ cct->get_perfcounters_collection()->add(logger.get());
+ }
cct->_conf.add_observer(this);
AdminSocket* admin_socket = cct->get_admin_socket();
int ret = admin_socket->register_command("mds_requests",
- "mds_requests",
&m_command_hook,
"show in-progress mds requests");
if (ret < 0) {
lderr(cct) << "error registering admin socket command: "
<< cpp_strerror(-ret) << dendl;
}
- ret = admin_socket->register_command("mds_sessions",
- "mds_sessions",
+ ret = admin_socket->register_command("mds_sessions "
+ "name=cap_dump,type=CephBool,req=false",
&m_command_hook,
"show mds session state");
if (ret < 0) {
<< cpp_strerror(-ret) << dendl;
}
ret = admin_socket->register_command("dump_cache",
- "dump_cache",
&m_command_hook,
"show in-memory metadata cache contents");
if (ret < 0) {
<< cpp_strerror(-ret) << dendl;
}
ret = admin_socket->register_command("kick_stale_sessions",
- "kick_stale_sessions",
&m_command_hook,
"kick sessions that were remote reset");
if (ret < 0) {
<< cpp_strerror(-ret) << dendl;
}
ret = admin_socket->register_command("status",
- "status",
&m_command_hook,
"show overall client status");
if (ret < 0) {
<< cpp_strerror(-ret) << dendl;
}
- client_lock.Lock();
+ std::lock_guard l{client_lock};
initialized = true;
- client_lock.Unlock();
}
void Client::shutdown()
// If we were not mounted, but were being used for sending
// MDS commands, we may have sessions that need closing.
- client_lock.Lock();
- _close_sessions();
- client_lock.Unlock();
-
+ {
+ std::lock_guard l{client_lock};
+ _close_sessions();
+ }
cct->_conf.remove_observer(this);
cct->get_admin_socket()->unregister_commands(&m_command_hook);
remount_finisher.stop();
}
- objectcacher->stop(); // outside of client_lock! this does a join.
-
- client_lock.Lock();
- ceph_assert(initialized);
- initialized = false;
- timer.shutdown();
- client_lock.Unlock();
+ if (ino_release_cb) {
+ ldout(cct, 10) << "shutdown stopping inode release finisher" << dendl;
+ async_ino_releasor.wait_for_empty();
+ async_ino_releasor.stop();
+ }
+ objectcacher->stop(); // outside of client_lock! this does a join.
+ {
+ std::lock_guard l{client_lock};
+ ceph_assert(initialized);
+ initialized = false;
+ timer.shutdown();
+ }
objecter_finisher.wait_for_empty();
objecter_finisher.stop();
<< dendl;
if (dn->inode) {
Inode *diri = dn->dir->parent_inode;
- diri->dir_release_count++;
clear_dir_complete_and_ordered(diri, true);
}
unlink(dn, false, false); // drop dir, drop dentry
if (old_dentry) {
if (old_dentry->dir != dir) {
Inode *old_diri = old_dentry->dir->parent_inode;
- old_diri->dir_ordered_count++;
clear_dir_complete_and_ordered(old_diri, false);
}
unlink(old_dentry, dir == old_dentry->dir, false); // drop dentry, keep dir open if its the same dir
}
Inode *diri = dir->parent_inode;
- diri->dir_ordered_count++;
clear_dir_complete_and_ordered(diri, false);
dn = link(dir, dname, in, dn);
}
ceph_assert(dn);
- if (dlease->mask & CEPH_LOCK_DN) {
+ if (dlease->mask & CEPH_LEASE_VALID) {
if (dttl > dn->lease_ttl) {
ldout(cct, 10) << "got dentry lease on " << dn->name
<< " dur " << dlease->duration_ms << "ms ttl " << dttl << dendl;
}
}
dn->cap_shared_gen = dn->dir->parent_inode->shared_gen;
+ if (dlease->mask & CEPH_LEASE_PRIMARY_LINK)
+ dn->mark_primary();
}
void Client::clear_dir_complete_and_ordered(Inode *diri, bool complete)
{
+ if (complete)
+ diri->dir_release_count++;
+ else
+ diri->dir_ordered_count++;
if (diri->flags & I_COMPLETE) {
if (complete) {
ldout(cct, 10) << " clearing (I_COMPLETE|I_DIR_ORDERED) on " << *diri << dendl;
Dentry *d = request->dentry();
if (d) {
Inode *diri = d->dir->parent_inode;
- diri->dir_release_count++;
clear_dir_complete_and_ordered(diri, true);
}
if (diri->dir && diri->dir->dentries.count(dname)) {
dn = diri->dir->dentries[dname];
if (dn->inode) {
- diri->dir_ordered_count++;
clear_dir_complete_and_ordered(diri, false);
unlink(dn, true, true); // keep dir, dentry
}
}
}
-void Client::dump_mds_sessions(Formatter *f)
+void Client::dump_mds_sessions(Formatter *f, bool cap_dump)
{
f->dump_int("id", get_nodeid().v);
entity_inst_t inst(messenger->get_myname(), messenger->get_myaddr_legacy());
f->open_array_section("sessions");
for (const auto &p : mds_sessions) {
f->open_object_section("session");
- p.second.dump(f);
+ p.second.dump(f, cap_dump);
f->close_section();
}
f->close_section();
}
}
-int Client::verify_reply_trace(int r,
+int Client::verify_reply_trace(int r, MetaSession *session,
MetaRequest *request, const MConstRef<MClientReply>& reply,
InodeRef *ptarget, bool *pcreated,
const UserPerm& perms)
extra_bl = reply->get_extra_bl();
if (extra_bl.length() >= 8) {
- // if the extra bufferlist has a buffer, we assume its the created inode
- // and that this request to create succeeded in actually creating
- // the inode (won the race with other create requests)
- decode(created_ino, extra_bl);
- got_created_ino = true;
+ if (session->mds_features.test(CEPHFS_FEATURE_DELEG_INO)) {
+ struct openc_response_t ocres;
+
+ decode(ocres, extra_bl);
+ created_ino = ocres.created_ino;
+ /*
+ * The userland cephfs client doesn't have a way to do an async create
+ * (yet), so just discard delegated_inos for now. Eventually we should
+ * store them and use them in create calls, even if they are synchronous,
+ * if only for testing purposes.
+ */
+ ldout(cct, 10) << "delegated_inos: " << ocres.delegated_inos << dendl;
+ } else {
+ // u64 containing number of created ino
+ decode(created_ino, extra_bl);
+ }
ldout(cct, 10) << "make_request created ino " << created_ino << dendl;
+ got_created_ino = true;
}
if (pcreated)
if (use_mds >= 0)
request->resend_mds = use_mds;
+ MetaSession *session = NULL;
while (1) {
if (request->aborted())
break;
}
// set up wait cond
- Cond caller_cond;
+ ceph::condition_variable caller_cond;
request->caller_cond = &caller_cond;
// choose mds
}
// open a session?
- MetaSession *session = NULL;
if (!have_open_session(mds)) {
session = _get_or_open_mds_session(mds);
-
+ if (session->state == MetaSession::STATE_REJECTED) {
+ request->abort(-EPERM);
+ break;
+ }
// wait
if (session->state == MetaSession::STATE_OPENING) {
ldout(cct, 10) << "waiting for session to mds." << mds << " to open" << dendl;
wait_on_context_list(session->waiting_for_open);
- // Abort requests on REJECT from MDS
- if (rejected_by_mds.count(mds)) {
- request->abort(-EPERM);
- break;
- }
continue;
}
// wait for signal
ldout(cct, 20) << "awaiting reply|forward|kick on " << &caller_cond << dendl;
request->kick = false;
- while (!request->reply && // reply
- request->resend_mds < 0 && // forward
- !request->kick)
- caller_cond.Wait(client_lock);
- request->caller_cond = NULL;
+ std::unique_lock l{client_lock, std::adopt_lock};
+ caller_cond.wait(l, [request] {
+ return (request->reply || // reply
+ request->resend_mds >= 0 || // forward
+ request->kick);
+ });
+ l.release();
+ request->caller_cond = nullptr;
// did we get a reply?
if (request->reply)
// kick dispatcher (we've got it!)
ceph_assert(request->dispatch_cond);
- request->dispatch_cond->Signal();
+ request->dispatch_cond->notify_all();
ldout(cct, 20) << "sendrecv kickback on tid " << tid << " " << request->dispatch_cond << dendl;
request->dispatch_cond = 0;
if (r >= 0 && ptarget)
- r = verify_reply_trace(r, request, reply, ptarget, pcreated, perms);
+ r = verify_reply_trace(r, session, request, reply, ptarget, pcreated, perms);
if (pdirbl)
*pdirbl = reply->get_extra_bl();
{
ldout(cct, 20) << __func__ << " enter(in:" << *in << ", req:" << req
<< " mds:" << mds << ", drop:" << drop << ", unless:" << unless
- << ", have:" << ", force:" << force << ")" << dendl;
+ << ", force:" << force << ")" << dendl;
int released = 0;
auto it = in->caps.find(mds);
if (it != in->caps.end()) {
drop &= ~(in->dirty_caps | get_caps_used(in));
if ((drop & cap.issued) &&
!(unless & cap.issued)) {
- ldout(cct, 25) << "Dropping caps. Initial " << ccap_string(cap.issued) << dendl;
+ ldout(cct, 25) << "dropping caps " << ccap_string(drop) << dendl;
cap.issued &= ~drop;
cap.implemented &= ~drop;
released = 1;
- ldout(cct, 25) << "Now have: " << ccap_string(cap.issued) << dendl;
} else {
released = force;
}
if (released) {
+ cap.wanted = in->caps_wanted();
+ if (&cap == in->auth_cap &&
+ !(cap.wanted & CEPH_CAP_ANY_FILE_WR)) {
+ in->requested_max_size = 0;
+ ldout(cct, 25) << "reset requested_max_size due to not wanting any file write cap" << dendl;
+ }
ceph_mds_request_release rel;
rel.ino = in->ino;
rel.cap_id = cap.cap_id;
rel.item.dname_len = dn->name.length();
rel.item.dname_seq = dn->lease_seq;
rel.dname = dn->name;
+ dn->lease_mds = -1;
}
ldout(cct, 25) << __func__ << " exit(dn:"
<< dn << ")" << dendl;
ceph_assert(em.second); /* not already present */
MetaSession *session = &em.first->second;
- // Maybe skip sending a request to open if this MDS daemon
- // has previously sent us a REJECT.
- if (rejected_by_mds.count(mds)) {
- if (rejected_by_mds[mds] == session->addrs) {
- ldout(cct, 4) << __func__ << " mds." << mds << " skipping "
- "because we were rejected" << dendl;
- return session;
- } else {
- ldout(cct, 4) << __func__ << " mds." << mds << " old inst "
- "rejected us, trying with new inst" << dendl;
- rejected_by_mds.erase(mds);
- }
- }
-
- auto m = MClientSession::create(CEPH_SESSION_REQUEST_OPEN);
+ auto m = make_message<MClientSession>(CEPH_SESSION_REQUEST_OPEN);
m->metadata = metadata;
m->supported_features = feature_bitset_t(CEPHFS_FEATURES_CLIENT_SUPPORTED);
session->con->send_message2(std::move(m));
{
ldout(cct, 2) << __func__ << " mds." << s->mds_num << " seq " << s->seq << dendl;
s->state = MetaSession::STATE_CLOSING;
- s->con->send_message2(MClientSession::create(CEPH_SESSION_REQUEST_CLOSE, s->seq));
+ s->con->send_message2(make_message<MClientSession>(CEPH_SESSION_REQUEST_CLOSE, s->seq));
}
-void Client::_closed_mds_session(MetaSession *s)
+void Client::_closed_mds_session(MetaSession *s, int err, bool rejected)
{
ldout(cct, 5) << __func__ << " mds." << s->mds_num << " seq " << s->seq << dendl;
- s->state = MetaSession::STATE_CLOSED;
+ if (rejected && s->state != MetaSession::STATE_CLOSING)
+ s->state = MetaSession::STATE_REJECTED;
+ else
+ s->state = MetaSession::STATE_CLOSED;
s->con->mark_down();
signal_context_list(s->waiting_for_open);
- mount_cond.Signal();
- remove_session_caps(s);
+ mount_cond.notify_all();
+ remove_session_caps(s, err);
kick_requests_closed(s);
- mds_sessions.erase(s->mds_num);
+ mds_ranks_closing.erase(s->mds_num);
+ if (s->state == MetaSession::STATE_CLOSED)
+ mds_sessions.erase(s->mds_num);
}
void Client::handle_client_session(const MConstRef<MClientSession>& m)
if (!missing_features.empty()) {
lderr(cct) << "mds." << from << " lacks required features '"
<< missing_features << "', closing session " << dendl;
- rejected_by_mds[session->mds_num] = session->addrs;
_close_mds_session(session);
- _closed_mds_session(session);
+ _closed_mds_session(session, -EPERM, true);
break;
}
session->mds_features = std::move(m->supported_features);
renew_caps(session);
session->state = MetaSession::STATE_OPEN;
if (unmounting)
- mount_cond.Signal();
+ mount_cond.notify_all();
else
connect_mds_targets(from);
signal_context_list(session->waiting_for_open);
if (auto& m = session->release; m) {
session->con->send_message2(std::move(m));
}
- session->con->send_message2(MClientSession::create(CEPH_SESSION_FLUSHMSG_ACK, m->get_seq()));
+ session->con->send_message2(make_message<MClientSession>(CEPH_SESSION_FLUSHMSG_ACK, m->get_seq()));
break;
case CEPH_SESSION_FORCE_RO:
error_str = "unknown error";
lderr(cct) << "mds." << from << " rejected us (" << error_str << ")" << dendl;
- rejected_by_mds[session->mds_num] = session->addrs;
- _closed_mds_session(session);
+ _closed_mds_session(session, -EPERM, true);
}
break;
bool Client::_any_stale_sessions() const
{
- ceph_assert(client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
for (const auto &p : mds_sessions) {
if (p.second.state == MetaSession::STATE_STALE) {
for (auto it = mds_sessions.begin(); it != mds_sessions.end(); ) {
MetaSession &s = it->second;
+ if (s.state == MetaSession::STATE_REJECTED) {
+ mds_sessions.erase(it++);
+ continue;
+ }
++it;
if (s.state == MetaSession::STATE_STALE)
_closed_mds_session(&s);
session->con->send_message2(std::move(r));
}
-MClientRequest::ref Client::build_client_request(MetaRequest *request)
+ref_t<MClientRequest> Client::build_client_request(MetaRequest *request)
{
- auto req = MClientRequest::create(request->get_op());
+ auto req = make_message<MClientRequest>(request->get_op());
req->set_tid(request->tid);
req->set_stamp(request->op_stamp);
memcpy(&req->head, &request->head, sizeof(ceph_mds_request_head));
request->item.remove_myself();
request->num_fwd = fwd->get_num_fwd();
request->resend_mds = fwd->get_dest_mds();
- request->caller_cond->Signal();
+ request->caller_cond->notify_all();
}
bool Client::is_dir_operation(MetaRequest *req)
request->sent_on_mseq == it->second.mseq)) {
ldout(cct, 20) << "have to return ESTALE" << dendl;
} else {
- request->caller_cond->Signal();
+ request->caller_cond->notify_all();
return;
}
}
// Only signal the caller once (on the first reply):
// Either its an unsafe reply, or its a safe reply and no unsafe reply was sent.
if (!is_safe || !request->got_unsafe) {
- Cond cond;
+ ceph::condition_variable cond;
request->dispatch_cond = &cond;
// wake up waiter
ldout(cct, 20) << __func__ << " signalling caller " << (void*)request->caller_cond << dendl;
- request->caller_cond->Signal();
+ request->caller_cond->notify_all();
// wake for kick back
- while (request->dispatch_cond) {
- ldout(cct, 20) << __func__ << " awaiting kickback on tid " << tid << " " << &cond << dendl;
- cond.Wait(client_lock);
- }
+ std::unique_lock l{client_lock, std::adopt_lock};
+ cond.wait(l, [tid, request, &cond, this] {
+ if (request->dispatch_cond) {
+ ldout(cct, 20) << "handle_client_reply awaiting kickback on tid "
+ << tid << " " << &cond << dendl;
+ }
+ return !request->dispatch_cond;
+ });
+ l.release();
}
if (is_safe) {
unregister_request(request);
}
if (unmounting)
- mount_cond.Signal();
+ mount_cond.notify_all();
}
void Client::_handle_full_flag(int64_t pool)
bool new_blacklist = false;
bool prenautilus = objecter->with_osdmap(
[&](const OSDMap& o) {
- return o.require_osd_release < CEPH_RELEASE_NAUTILUS;
+ return o.require_osd_release < ceph_release_t::nautilus;
});
if (!blacklisted) {
for (auto a : myaddrs.v) {
switch (m->get_type()) {
// mounting and mds sessions
case CEPH_MSG_MDS_MAP:
- handle_mds_map(MMDSMap::msgref_cast(m));
+ handle_mds_map(ref_cast<MMDSMap>(m));
break;
case CEPH_MSG_FS_MAP:
- handle_fs_map(MFSMap::msgref_cast(m));
+ handle_fs_map(ref_cast<MFSMap>(m));
break;
case CEPH_MSG_FS_MAP_USER:
- handle_fs_map_user(MFSMapUser::msgref_cast(m));
+ handle_fs_map_user(ref_cast<MFSMapUser>(m));
break;
case CEPH_MSG_CLIENT_SESSION:
- handle_client_session(MClientSession::msgref_cast(m));
+ handle_client_session(ref_cast<MClientSession>(m));
break;
case CEPH_MSG_OSD_MAP:
- handle_osd_map(MOSDMap::msgref_cast(m));
+ handle_osd_map(ref_cast<MOSDMap>(m));
break;
// requests
case CEPH_MSG_CLIENT_REQUEST_FORWARD:
- handle_client_request_forward(MClientRequestForward::msgref_cast(m));
+ handle_client_request_forward(ref_cast<MClientRequestForward>(m));
break;
case CEPH_MSG_CLIENT_REPLY:
- handle_client_reply(MClientReply::msgref_cast(m));
+ handle_client_reply(ref_cast<MClientReply>(m));
break;
// reclaim reply
case CEPH_MSG_CLIENT_RECLAIM_REPLY:
- handle_client_reclaim_reply(MClientReclaimReply::msgref_cast(m));
+ handle_client_reclaim_reply(ref_cast<MClientReclaimReply>(m));
break;
case CEPH_MSG_CLIENT_SNAP:
- handle_snap(MClientSnap::msgref_cast(m));
+ handle_snap(ref_cast<MClientSnap>(m));
break;
case CEPH_MSG_CLIENT_CAPS:
- handle_caps(MClientCaps::msgref_cast(m));
+ handle_caps(ref_cast<MClientCaps>(m));
break;
case CEPH_MSG_CLIENT_LEASE:
- handle_lease(MClientLease::msgref_cast(m));
+ handle_lease(ref_cast<MClientLease>(m));
break;
case MSG_COMMAND_REPLY:
if (m->get_source().type() == CEPH_ENTITY_TYPE_MDS) {
- handle_command_reply(MCommandReply::msgref_cast(m));
+ handle_command_reply(ref_cast<MCommandReply>(m));
} else {
return false;
}
break;
case CEPH_MSG_CLIENT_QUOTA:
- handle_quota(MClientQuota::msgref_cast(m));
+ handle_quota(ref_cast<MClientQuota>(m));
break;
default:
trim_cache();
if (size < lru.lru_get_size() + inode_map.size()) {
ldout(cct, 10) << "unmounting: trim pass, cache shrank, poking unmount()" << dendl;
- mount_cond.Signal();
+ mount_cond.notify_all();
} else {
ldout(cct, 10) << "unmounting: trim pass, size still " << lru.lru_get_size()
<< "+" << inode_map.size() << dendl;
early_kick_flushing_caps(session);
- auto m = MClientReconnect::create();
+ auto m = make_message<MClientReconnect>();
bool allow_multi = session->mds_features.test(CEPHFS_FEATURE_MULTI_RECONNECT);
// i have an open session.
auto it = in->caps.find(mds);
if (it != in->caps.end()) {
if (allow_multi &&
- m->get_approx_size() >= (std::numeric_limits<int>::max() >> 1)) {
+ m->get_approx_size() >=
+ static_cast<size_t>((std::numeric_limits<int>::max() >> 1))) {
m->mark_more();
session->con->send_message2(std::move(m));
- m = MClientReconnect::create();
+ m = make_message<MClientReconnect>();
}
Cap &cap = it->second;
<< " wants " << ccap_string(in->caps_wanted())
<< dendl;
filepath path;
- in->make_long_path(path);
+ in->make_short_path(path);
ldout(cct, 10) << " path " << path << dendl;
bufferlist flockbl;
m->set_encoding_version(0); // use connection features to choose encoding
session->con->send_message2(std::move(m));
- mount_cond.Signal();
+ mount_cond.notify_all();
if (session->reclaim_state == MetaSession::RECLAIMING)
signal_cond_list(waiting_for_reclaim);
if (req->aborted()) {
if (req->caller_cond) {
req->kick = true;
- req->caller_cond->Signal();
+ req->caller_cond->notify_all();
}
continue;
}
if (req->mds == session->mds_num) {
if (req->caller_cond) {
req->kick = true;
- req->caller_cond->Signal();
+ req->caller_cond->notify_all();
}
req->item.remove_myself();
if (req->got_unsafe) {
s->seq++;
ldout(cct, 10) << " mds." << s->mds_num << " seq now " << s->seq << dendl;
if (s->state == MetaSession::STATE_CLOSING) {
- s->con->send_message2(MClientSession::create(CEPH_SESSION_REQUEST_CLOSE, s->seq));
+ s->con->send_message2(make_message<MClientSession>(CEPH_SESSION_REQUEST_CLOSE, s->seq));
}
}
}
in = inode_map[vino];
- if (m->get_mask() & CEPH_LOCK_DN) {
+ if (m->get_mask() & CEPH_LEASE_VALID) {
if (!in->dir || in->dir->dentries.count(m->dname) == 0) {
ldout(cct, 10) << " don't have dir|dentry " << m->get_ino() << "/" << m->dname <<dendl;
goto revoke;
revoke:
{
- auto reply = MClientLease::create(CEPH_MDS_LEASE_RELEASE, seq, m->get_mask(), m->get_ino(), m->get_first(), m->get_last(), m->dname);
+ auto reply = make_message<MClientLease>(CEPH_MDS_LEASE_RELEASE, seq,
+ m->get_mask(), m->get_ino(),
+ m->get_first(), m->get_last(), m->dname);
m->get_connection()->send_message2(std::move(reply));
}
}
Dentry *olddn = in->get_first_parent();
ceph_assert(olddn->dir != dir || olddn->name != name);
Inode *old_diri = olddn->dir->parent_inode;
- old_diri->dir_release_count++;
clear_dir_complete_and_ordered(old_diri, true);
unlink(olddn, true, true); // keep dir, dentry
}
public:
C_Client_FlushComplete(Client *c, Inode *in) : client(c), inode(in) { }
void finish(int r) override {
- ceph_assert(client->client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client->client_lock));
if (r != 0) {
client_t const whoami = client->whoami; // For the benefit of ldout prefix
ldout(client->cct, 1) << "I/O error from flush on inode " << inode
int put_nref = 0;
int drop = last & ~in->caps_issued();
if (in->snapid == CEPH_NOSNAP) {
- if ((last & CEPH_CAP_FILE_WR) &&
+ if ((last & (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER)) &&
!in->cap_snaps.empty() &&
in->cap_snaps.rbegin()->second.writing) {
ldout(cct, 10) << __func__ << " finishing pending cap_snap on " << *in << dendl;
}
}
-int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff)
+int Client::get_caps(Fh *fh, int need, int want, int *phave, loff_t endoff)
{
+ Inode *in = fh->inode.get();
+
int r = check_pool_perm(in, need);
if (r < 0)
return r;
return -EBADF;
}
+ if ((fh->mode & CEPH_FILE_MODE_WR) && fh->gen != fd_gen)
+ return -EBADF;
+
+ if ((in->flags & I_ERROR_FILELOCK) && fh->has_any_filelocks())
+ return -EIO;
+
int implemented;
int have = in->caps_issued(&implemented);
bool waitfor_commit = false;
if (have & need & CEPH_CAP_FILE_WR) {
- if (endoff > 0 &&
- (endoff >= (loff_t)in->max_size ||
- endoff > (loff_t)(in->size << 1)) &&
- endoff > (loff_t)in->wanted_max_size) {
- ldout(cct, 10) << "wanted_max_size " << in->wanted_max_size << " -> " << endoff << dendl;
- in->wanted_max_size = endoff;
- check_caps(in, 0);
+ if (endoff > 0) {
+ if ((endoff >= (loff_t)in->max_size ||
+ endoff > (loff_t)(in->size << 1)) &&
+ endoff > (loff_t)in->wanted_max_size) {
+ ldout(cct, 10) << "wanted_max_size " << in->wanted_max_size << " -> " << endoff << dendl;
+ in->wanted_max_size = endoff;
+ }
+ if (in->wanted_max_size > in->max_size &&
+ in->wanted_max_size > in->requested_max_size)
+ check_caps(in, 0);
}
if (endoff >= 0 && endoff > (loff_t)in->max_size) {
if (flush)
follows = in->snaprealm->get_snap_context().seq;
- auto m = MClientCaps::create(op,
+ auto m = make_message<MClientCaps>(op,
in->ino,
0,
cap->cap_id, cap->seq,
m->set_snap_follows(follows);
cap->wanted = want;
if (cap == in->auth_cap) {
- m->set_max_size(in->wanted_max_size);
- in->requested_max_size = in->wanted_max_size;
- ldout(cct, 15) << "auth cap, setting max_size = " << in->requested_max_size << dendl;
+ if (want & CEPH_CAP_ANY_FILE_WR) {
+ m->set_max_size(in->wanted_max_size);
+ in->requested_max_size = in->wanted_max_size;
+ ldout(cct, 15) << "auth cap, requesting max_size " << in->requested_max_size << dendl;
+ } else {
+ in->requested_max_size = 0;
+ ldout(cct, 15) << "auth cap, reset requested_max_size due to not wanting any file write cap" << dendl;
+ }
}
if (!session->flushing_caps_tids.empty())
}
int flushing;
+ int msg_flags = 0;
ceph_tid_t flush_tid;
if (in->auth_cap == &cap && in->dirty_caps) {
flushing = mark_caps_flushing(in, &flush_tid);
+ if (flags & CHECK_CAPS_SYNCHRONOUS)
+ msg_flags |= MClientCaps::FLAG_SYNC;
} else {
flushing = 0;
flush_tid = 0;
}
- int msg_flags = (flags & CHECK_CAPS_SYNCHRONOUS) ? MClientCaps::FLAG_SYNC : 0;
send_cap(in, session, &cap, msg_flags, cap_used, wanted, retain,
flushing, flush_tid);
}
capsnap.context = old_snapc;
capsnap.issued = in->caps_issued();
capsnap.dirty = in->caps_dirty();
-
+
capsnap.dirty_data = (used & CEPH_CAP_FILE_BUFFER);
-
+
capsnap.uid = in->uid;
capsnap.gid = in->gid;
capsnap.mode = in->mode;
capsnap.xattr_version = in->xattr_version;
capsnap.cap_dirtier_uid = in->cap_dirtier_uid;
capsnap.cap_dirtier_gid = in->cap_dirtier_gid;
-
+
if (used & CEPH_CAP_FILE_WR) {
ldout(cct, 10) << __func__ << " WR used on " << *in << dendl;
capsnap.writing = 1;
}
if (used & CEPH_CAP_FILE_BUFFER) {
+ capsnap.writing = 1;
ldout(cct, 10) << __func__ << " " << *in << " cap_snap " << &capsnap << " used " << used
<< " WRBUFFER, delaying" << dendl;
} else {
}
}
-void Client::_flushed_cap_snap(Inode *in, snapid_t seq)
-{
- ldout(cct, 10) << __func__ << " seq " << seq << " on " << *in << dendl;
- in->cap_snaps.at(seq).dirty_data = 0;
- flush_snaps(in);
-}
-
void Client::send_flush_snap(Inode *in, MetaSession *session,
snapid_t follows, CapSnap& capsnap)
{
- auto m = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP,
- in->ino, in->snaprealm->ino, 0,
- in->auth_cap->mseq, cap_epoch_barrier);
+ auto m = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP,
+ in->ino, in->snaprealm->ino, 0,
+ in->auth_cap->mseq, cap_epoch_barrier);
m->caller_uid = capsnap.cap_dirtier_uid;
m->caller_gid = capsnap.cap_dirtier_gid;
<< " on " << *in << dendl;
if (capsnap.dirty_data || capsnap.writing)
break;
-
+
capsnap.flush_tid = ++last_flush_tid;
session->flushing_caps_tids.insert(capsnap.flush_tid);
in->flushing_cap_tids[capsnap.flush_tid] = 0;
}
}
-void Client::wait_on_list(list<Cond*>& ls)
+void Client::wait_on_list(list<ceph::condition_variable*>& ls)
{
- Cond cond;
+ ceph::condition_variable cond;
ls.push_back(&cond);
- cond.Wait(client_lock);
+ std::unique_lock l{client_lock, std::adopt_lock};
+ cond.wait(l);
+ l.release();
ls.remove(&cond);
}
-void Client::signal_cond_list(list<Cond*>& ls)
+void Client::signal_cond_list(list<ceph::condition_variable*>& ls)
{
- for (list<Cond*>::iterator it = ls.begin(); it != ls.end(); ++it)
- (*it)->Signal();
+ for (auto cond : ls) {
+ cond->notify_all();
+ }
}
void Client::wait_on_context_list(list<Context*>& ls)
{
- Cond cond;
+ ceph::condition_variable cond;
bool done = false;
int r;
- ls.push_back(new C_Cond(&cond, &done, &r));
- while (!done)
- cond.Wait(client_lock);
+ ls.push_back(new C_Cond(cond, &done, &r));
+ std::unique_lock l{client_lock, std::adopt_lock};
+ cond.wait(l, [&done] { return done;});
+ l.release();
}
void Client::signal_context_list(list<Context*>& ls)
}
void finish(int r) override {
// _async_invalidate takes the lock when it needs to, call this back from outside of lock.
- ceph_assert(!client->client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_not_locked_by_me(client->client_lock));
client->_async_invalidate(ino, offset, length);
}
};
void Client::_flush_range(Inode *in, int64_t offset, uint64_t size)
{
- ceph_assert(client_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(client_lock));
if (!in->oset.dirty_or_tx) {
ldout(cct, 10) << " nothing to flush" << dendl;
return;
offset, size, &onflush);
if (!ret) {
// wait for flush
- client_lock.Unlock();
+ client_lock.unlock();
onflush.wait();
- client_lock.Lock();
+ client_lock.lock();
}
}
void Client::flush_set_callback(ObjectCacher::ObjectSet *oset)
{
// std::lock_guard l(client_lock);
- ceph_assert(client_lock.is_locked()); // will be called via dispatch() -> objecter -> ...
+ ceph_assert(ceph_mutex_is_locked(client_lock)); // will be called via dispatch() -> objecter -> ...
Inode *in = static_cast<Inode *>(oset->parent);
ceph_assert(in);
_flushed(in);
!(had & CEPH_CAP_FILE_CACHE))
in->cache_gen++;
- if ((issued & CEPH_CAP_FILE_SHARED) &&
- !(had & CEPH_CAP_FILE_SHARED)) {
- in->shared_gen++;
-
+ if ((issued & CEPH_CAP_FILE_SHARED) !=
+ (had & CEPH_CAP_FILE_SHARED)) {
+ if (issued & CEPH_CAP_FILE_SHARED)
+ in->shared_gen++;
if (in->is_dir())
clear_dir_complete_and_ordered(in, true);
}
remove_cap(&in->caps.begin()->second, true);
}
-void Client::remove_session_caps(MetaSession *s)
+void Client::remove_session_caps(MetaSession *s, int err)
{
ldout(cct, 10) << __func__ << " mds." << s->mds_num << dendl;
dirty_caps = in->dirty_caps | in->flushing_caps;
in->wanted_max_size = 0;
in->requested_max_size = 0;
+ if (in->has_any_filelocks())
+ in->flags |= I_ERROR_FILELOCK;
}
+ auto caps = cap->implemented;
if (cap->wanted | cap->issued)
in->flags |= I_CAP_DROPPED;
remove_cap(cap, false);
in->mark_caps_clean();
put_inode(in.get());
}
+ caps &= CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_BUFFER;
+ if (caps && !in->caps_issued_mask(caps, true)) {
+ if (err == -EBLACKLISTED) {
+ if (in->oset.dirty_or_tx) {
+ lderr(cct) << __func__ << " still has dirty data on " << *in << dendl;
+ in->set_async_err(err);
+ }
+ objectcacher->purge_set(&in->oset);
+ } else {
+ objectcacher->release_set(&in->oset);
+ }
+ _schedule_invalidate_callback(in.get(), 0, 0);
+ }
+
signal_cond_list(in->waitfor_caps);
}
s->flushing_caps_tids.clear();
- sync_cond.Signal();
+ sync_cond.notify_all();
}
int Client::_do_remount(bool retry_on_error)
{
- uint64_t max_retries = g_conf().get_val<uint64_t>("mds_max_retries_on_remount_failure");
+ uint64_t max_retries = cct->_conf.get_val<uint64_t>("mds_max_retries_on_remount_failure");
errno = 0;
int r = remount_cb(callback_handle);
}
}
+class C_Client_CacheRelease : public Context {
+private:
+ Client *client;
+ vinodeno_t ino;
+public:
+ C_Client_CacheRelease(Client *c, Inode *in) :
+ client(c) {
+ if (client->use_faked_inos())
+ ino = vinodeno_t(in->faked_ino, CEPH_NOSNAP);
+ else
+ ino = in->vino();
+ }
+ void finish(int r) override {
+ ceph_assert(ceph_mutex_is_not_locked_by_me(client->client_lock));
+ client->_async_inode_release(ino);
+ }
+};
+
+void Client::_async_inode_release(vinodeno_t ino)
+{
+ if (unmounting)
+ return;
+ ldout(cct, 10) << __func__ << " " << ino << dendl;
+ ino_release_cb(callback_handle, ino);
+}
+
+void Client::_schedule_ino_release_callback(Inode *in) {
+
+ if (ino_release_cb)
+ // we queue the invalidate, which calls the callback and decrements the ref
+ async_ino_releasor.queue(new C_Client_CacheRelease(this, in));
+}
+
void Client::trim_caps(MetaSession *s, uint64_t max)
{
mds_rank_t mds = s->mds_num;
++q;
if (dn->lru_is_expireable()) {
if (can_invalidate_dentries &&
- dn->dir->parent_inode->ino == MDS_INO_ROOT) {
+ dn->dir->parent_inode->ino == CEPH_INO_ROOT) {
// Only issue one of these per DN for inodes in root: handle
// others more efficiently by calling for root-child DNs at
// the end of this function.
all = false;
}
}
- if (all && in->ino != MDS_INO_ROOT) {
+ if (in->ll_ref == 1 && in->ino != CEPH_INO_ROOT) {
+ _schedule_ino_release_callback(in.get());
+ }
+ if (all && in->ino != CEPH_INO_ROOT) {
ldout(cct, 20) << __func__ << " counting as trimmed: " << *in << dendl;
trimmed++;
}
if (oldest_tid <= want) {
ldout(cct, 10) << " waiting on mds." << p.first << " tid " << oldest_tid
<< " (want " << want << ")" << dendl;
- sync_cond.Wait(client_lock);
+ std::unique_lock l{client_lock, std::adopt_lock};
+ sync_cond.wait(l);
+ l.release();
goto retry;
}
}
ldout(cct, 10) << __func__ << " " << *realm << " seq " << info.seq()
<< " <= " << realm->seq << " and same parent, SKIPPING" << dendl;
}
-
+
if (!first_realm)
first_realm = realm;
else
put_snap_realm(realm);
}
- for (map<SnapRealm*, SnapContext>::iterator q = dirty_realms.begin();
- q != dirty_realms.end();
- ++q) {
- SnapRealm *realm = q->first;
+ for (auto &[realm, snapc] : dirty_realms) {
// if there are new snaps ?
- if (has_new_snaps(q->second, realm->get_snap_context())) {
+ if (has_new_snaps(snapc, realm->get_snap_context())) {
ldout(cct, 10) << " flushing caps on " << *realm << dendl;
- xlist<Inode*>::iterator r = realm->inodes_with_caps.begin();
- while (!r.end()) {
- Inode *in = *r;
- ++r;
- queue_cap_snap(in, q->second);
+ for (auto&& in : realm->inodes_with_caps) {
+ queue_cap_snap(in, snapc);
}
} else {
ldout(cct, 10) << " no new snap on " << *realm << dendl;
SnapRealm *realm = NULL;
update_snap_trace(m->snapbl, &realm);
+ int issued = m->get_caps();
+ int wanted = m->get_wanted();
add_update_cap(in, session, m->get_cap_id(),
- m->get_caps(), m->get_wanted(), m->get_seq(), m->get_mseq(),
+ issued, wanted, m->get_seq(), m->get_mseq(),
m->get_realm(), CEPH_CAP_FLAG_AUTH, cap_perms);
if (cap && cap->cap_id == m->peer.cap_id) {
put_snap_realm(realm);
if (in->auth_cap && in->auth_cap->session == session) {
+ if (!(wanted & CEPH_CAP_ANY_FILE_WR) ||
+ in->requested_max_size > m->get_max_size()) {
+ in->requested_max_size = 0;
+ ldout(cct, 15) << "reset requested_max_size after cap import" << dendl;
+ }
// reflush any/all caps (if we are now the auth_cap)
kick_flushing_caps(in, session);
}
signal_cond_list(in->waitfor_caps);
if (session->flushing_caps_tids.empty() ||
*session->flushing_caps_tids.begin() > flush_ack_tid)
- sync_cond.Signal();
+ sync_cond.notify_all();
}
if (!dirty) {
signal_cond_list(in->waitfor_caps);
if (session->flushing_caps_tids.empty() ||
*session->flushing_caps_tids.begin() > flush_ack_tid)
- sync_cond.Signal();
+ sync_cond.notify_all();
}
} else {
ldout(cct, 5) << __func__ << " DUP(?) mds." << mds << " flushed snap follows " << follows
}
void finish(int r) override {
// _async_dentry_invalidate is responsible for its own locking
- ceph_assert(!client->client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_not_locked_by_me(client->client_lock));
client->_async_dentry_invalidate(dirino, ino, name);
}
};
return;
ldout(cct, 10) << __func__ << " '" << name << "' ino " << ino
<< " in dir " << dirino << dendl;
- dentry_invalidate_cb(callback_handle, dirino, ino, name);
+ dentry_invalidate_cb(callback_handle, dirino, ino, name.c_str(), name.length());
}
void Client::_schedule_invalidate_dentry_callback(Dentry *dn, bool del)
*/
int Client::authenticate()
{
- ceph_assert(client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
if (monclient->is_authenticated()) {
return 0;
}
- client_lock.Unlock();
+ client_lock.unlock();
int r = monclient->authenticate(cct->_conf->client_mount_timeout);
- client_lock.Lock();
+ client_lock.lock();
if (r < 0) {
return r;
}
do {
C_SaferCond cond;
monclient->get_version("fsmap", &fsmap_latest, NULL, &cond);
- client_lock.Unlock();
+ client_lock.unlock();
r = cond.wait();
- client_lock.Lock();
+ client_lock.lock();
} while (r == -EAGAIN);
if (r < 0) {
std::string resolved_fs_name;
if (fs_name.empty()) {
- resolved_fs_name = cct->_conf.get_val<std::string>("client_mds_namespace");
+ resolved_fs_name = cct->_conf.get_val<std::string>("client_fs");
+ if (resolved_fs_name.empty())
+ // Try the backwards compatibility fs name option
+ resolved_fs_name = cct->_conf.get_val<std::string>("client_mds_namespace");
} else {
resolved_fs_name = fs_name;
}
void Client::_close_sessions()
{
+ for (auto it = mds_sessions.begin(); it != mds_sessions.end(); ) {
+ if (it->second.state == MetaSession::STATE_REJECTED)
+ mds_sessions.erase(it++);
+ else
+ ++it;
+ }
+
while (!mds_sessions.empty()) {
// send session closes!
for (auto &p : mds_sessions) {
if (p.second.state != MetaSession::STATE_CLOSING) {
_close_mds_session(&p.second);
+ mds_ranks_closing.insert(p.first);
}
}
// wait for sessions to close
- ldout(cct, 2) << "waiting for " << mds_sessions.size() << " mds sessions to close" << dendl;
- mount_cond.Wait(client_lock);
+ double timo = cct->_conf.get_val<std::chrono::seconds>("client_shutdown_timeout").count();
+ ldout(cct, 2) << "waiting for " << mds_ranks_closing.size() << " mds session(s) to close (timeout: "
+ << timo << "s)" << dendl;
+ std::unique_lock l{client_lock, std::adopt_lock};
+ if (!timo) {
+ mount_cond.wait(l);
+ } else if (!mount_cond.wait_for(l, ceph::make_timespan(timo), [this] { return mds_ranks_closing.empty(); })) {
+ ldout(cct, 1) << mds_ranks_closing.size() << " mds(s) did not respond to session close -- timing out." << dendl;
+ while (!mds_ranks_closing.empty()) {
+ auto session = mds_sessions.at(*mds_ranks_closing.begin());
+ // this prunes entry from mds_sessions and mds_ranks_closing
+ _closed_mds_session(&session, -ETIMEDOUT);
+ }
+ }
+
+ mds_ranks_closing.clear();
+ l.release();
}
}
// will crash if they see an unknown CEPH_SESSION_* value in this msg.
const uint64_t features = session->con->get_features();
if (HAVE_FEATURE(features, SERVER_LUMINOUS)) {
- auto m = MClientSession::create(CEPH_SESSION_REQUEST_FLUSH_MDLOG);
+ auto m = make_message<MClientSession>(CEPH_SESSION_REQUEST_FLUSH_MDLOG);
session->con->send_message2(std::move(m));
}
}
req->abort(err);
if (req->caller_cond) {
req->kick = true;
- req->caller_cond->Signal();
+ req->caller_cond->notify_all();
}
}
// Force-close all sessions
while(!mds_sessions.empty()) {
auto& session = mds_sessions.begin()->second;
- _closed_mds_session(&session);
+ _closed_mds_session(&session, err);
}
}
void Client::_unmount(bool abort)
{
+ std::unique_lock lock{client_lock, std::adopt_lock};
if (unmounting)
return;
flush_mdlog_sync();
}
- while (!mds_requests.empty()) {
- ldout(cct, 10) << "waiting on " << mds_requests.size() << " requests" << dendl;
- mount_cond.Wait(client_lock);
- }
-
+ mount_cond.wait(lock, [this] {
+ if (!mds_requests.empty()) {
+ ldout(cct, 10) << "waiting on " << mds_requests.size() << " requests"
+ << dendl;
+ }
+ return mds_requests.empty();
+ });
if (tick_event)
timer.cancel_event(tick_event);
tick_event = 0;
_ll_drop_pins();
- while (unsafe_sync_write > 0) {
- ldout(cct, 0) << unsafe_sync_write << " unsafe_sync_writes, waiting" << dendl;
- mount_cond.Wait(client_lock);
- }
+ mount_cond.wait(lock, [this] {
+ if (unsafe_sync_write > 0) {
+ ldout(cct, 0) << unsafe_sync_write << " unsafe_sync_writes, waiting"
+ << dendl;
+ }
+ return unsafe_sync_write <= 0;
+ });
if (cct->_conf->client_oc) {
// flush/release all buffered data
<< "+" << inode_map.size() << " items"
<< ", waiting (for caps to release?)"
<< dendl;
- utime_t until = ceph_clock_now() + utime_t(5, 0);
- int r = mount_cond.WaitUntil(client_lock, until);
- if (r == ETIMEDOUT) {
+ if (auto r = mount_cond.wait_for(lock, ceph::make_timespan(5));
+ r == std::cv_status::timeout) {
dump_cache(NULL);
}
}
mounted = false;
+ lock.release();
ldout(cct, 2) << "unmounted." << dendl;
}
ldout(cct, 21) << "tick" << dendl;
tick_event = timer.add_event_after(
cct->_conf->client_tick_interval,
- new FunctionContext([this](int) {
+ new LambdaContext([this](int) {
// Called back via Timer, which takes client_lock for us
- ceph_assert(client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
tick();
}));
utime_t now = ceph_clock_now();
req->abort(-ETIMEDOUT);
if (req->caller_cond) {
req->kick = true;
- req->caller_cond->Signal();
+ req->caller_cond->notify_all();
}
signal_cond_list(waiting_for_mdsmap);
for (auto &p : mds_sessions) {
}
trim_cache(true);
+
+ if (blacklisted && mounted &&
+ last_auto_reconnect + 30 * 60 < now &&
+ cct->_conf.get_val<bool>("client_reconnect_stale")) {
+ messenger->client_reset();
+ fd_gen++; // invalidate open files
+ blacklisted = false;
+ _kick_stale_sessions();
+ last_auto_reconnect = now;
+ }
}
void Client::renew_caps()
ldout(cct, 10) << "renew_caps mds." << session->mds_num << dendl;
session->last_cap_renew_request = ceph_clock_now();
uint64_t seq = ++session->cap_renew_seq;
- session->con->send_message2(MClientSession::create(CEPH_SESSION_REQUEST_RENEWCAPS, seq));
+ session->con->send_message2(make_message<MClientSession>(CEPH_SESSION_REQUEST_RENEWCAPS, seq));
}
{
int r = 0;
Dentry *dn = NULL;
+ // can only request shared caps
+ mask &= CEPH_CAP_ANY_SHARED | CEPH_STAT_RSTAT;
if (dname == "..") {
if (dir->dentries.empty()) {
int r = make_request(req, perms, &tmptarget, NULL, rand() % mdsmap->get_num_in_mds());
if (r == 0) {
- Inode *tempino = tmptarget.get();
- _ll_get(tempino);
- *target = tempino;
+ *target = std::move(tmptarget);
ldout(cct, 8) << __func__ << " found target " << (*target)->ino << dendl;
} else {
*target = dir;
mask |= CEPH_CAP_AUTH_SHARED;
if (want & (CEPH_STATX_NLINK|CEPH_STATX_CTIME|CEPH_STATX_VERSION))
mask |= CEPH_CAP_LINK_SHARED;
- if (want & (CEPH_STATX_ATIME|CEPH_STATX_MTIME|CEPH_STATX_CTIME|CEPH_STATX_SIZE|CEPH_STATX_BLOCKS|CEPH_STATX_VERSION))
+ if (want & (CEPH_STATX_NLINK|CEPH_STATX_ATIME|CEPH_STATX_MTIME|CEPH_STATX_CTIME|CEPH_STATX_SIZE|CEPH_STATX_BLOCKS|CEPH_STATX_VERSION))
mask |= CEPH_CAP_FILE_SHARED;
if (want & (CEPH_STATX_VERSION|CEPH_STATX_CTIME))
mask |= CEPH_CAP_XATTR_SHARED;
int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
int caps, bool getref)
{
- ceph_assert(client_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(client_lock));
ldout(cct, 10) << __func__ << " " << dirp << " on " << dirp->inode->ino
<< " last_name " << dirp->last_name << " offset " << hex << dirp->offset << dec
<< dendl;
string dn_name;
while (true) {
+ int mask = caps;
if (!dirp->inode->is_complete_and_ordered())
return -EAGAIN;
if (pd == dir->readdir_cache.end())
}
int idx = pd - dir->readdir_cache.begin();
- int r = _getattr(dn->inode, caps, dirp->perms);
+ if (dn->inode->is_dir()) {
+ mask |= CEPH_STAT_RSTAT;
+ }
+ int r = _getattr(dn->inode, mask, dirp->perms);
if (r < 0)
return r;
dn_name = dn->name; // fill in name while we have lock
- client_lock.Unlock();
+ client_lock.unlock();
r = cb(p, &de, &stx, next_off, in); // _next_ offset
- client_lock.Lock();
+ client_lock.lock();
ldout(cct, 15) << " de " << de.d_name << " off " << hex << dn->offset << dec
<< " = " << r << dendl;
if (r < 0) {
uint64_t next_off = 1;
int r;
- r = _getattr(diri, caps, dirp->perms);
+ r = _getattr(diri, caps | CEPH_STAT_RSTAT, dirp->perms);
if (r < 0)
return r;
_ll_get(inode);
}
- client_lock.Unlock();
+ client_lock.unlock();
r = cb(p, &de, &stx, next_off, inode);
- client_lock.Lock();
+ client_lock.lock();
if (r < 0)
return r;
in = diri->get_first_parent()->dir->parent_inode;
int r;
- r = _getattr(in, caps, dirp->perms);
+ r = _getattr(in, caps | CEPH_STAT_RSTAT, dirp->perms);
if (r < 0)
return r;
_ll_get(inode);
}
- client_lock.Unlock();
+ client_lock.unlock();
r = cb(p, &de, &stx, next_off, inode);
- client_lock.Lock();
+ client_lock.lock();
if (r < 0)
return r;
int r;
if (check_caps) {
- r = _getattr(entry.inode, caps, dirp->perms);
+ int mask = caps;
+ if(entry.inode->is_dir()){
+ mask |= CEPH_STAT_RSTAT;
+ }
+ r = _getattr(entry.inode, mask, dirp->perms);
if (r < 0)
return r;
}
_ll_get(inode);
}
- client_lock.Unlock();
+ client_lock.unlock();
r = cb(p, &de, &stx, next_off, inode); // _next_ offset
- client_lock.Lock();
+ client_lock.lock();
ldout(cct, 15) << " de " << de.d_name << " off " << hex << next_off - 1 << dec
<< " = " << r << dendl;
struct dirent *Client::readdir(dir_result_t *d)
{
int ret;
- static struct dirent de;
+ auto& de = d->de;
single_readdir sr;
sr.de = &de;
sr.stx = NULL;
mode_t mode, int stripe_unit, int stripe_count,
int object_size, const char *data_pool)
{
- ldout(cct, 3) << "open enter(" << relpath << ", " << ceph_flags_sys2wire(flags) << "," << mode << ")" << dendl;
+ int cflags = ceph_flags_sys2wire(flags);
+
+ ldout(cct, 3) << "open enter(" << relpath << ", " << cflags << "," << mode << ")" << dendl;
std::lock_guard lock(client_lock);
tout(cct) << "open" << std::endl;
tout(cct) << relpath << std::endl;
- tout(cct) << ceph_flags_sys2wire(flags) << std::endl;
+ tout(cct) << cflags << std::endl;
if (unmounting)
return -ENOTCONN;
bool created = false;
/* O_CREATE with O_EXCL enforces O_NOFOLLOW. */
bool followsym = !((flags & O_NOFOLLOW) || ((flags & O_CREAT) && (flags & O_EXCL)));
- int r = path_walk(path, &in, perms, followsym, ceph_caps_for_mode(mode));
+ int mask = ceph_caps_for_mode(ceph_flags_to_mode(cflags));
+
+ int r = path_walk(path, &in, perms, followsym, mask);
if (r == 0 && (flags & O_CREAT) && (flags & O_EXCL))
return -EEXIST;
out:
tout(cct) << r << std::endl;
- ldout(cct, 3) << "open exit(" << path << ", " << ceph_flags_sys2wire(flags) << ") = " << r << dendl;
+ ldout(cct, 3) << "open exit(" << path << ", " << cflags << ") = " << r << dendl;
return r;
}
* the resulting Inode object in one operation, so that caller
* can safely assume inode will still be there after return.
*/
-int Client::_lookup_ino(inodeno_t ino, const UserPerm& perms, Inode **inode)
+int Client::_lookup_vino(vinodeno_t vino, const UserPerm& perms, Inode **inode)
{
- ldout(cct, 8) << __func__ << " enter(" << ino << ")" << dendl;
+ ldout(cct, 8) << __func__ << " enter(" << vino << ")" << dendl;
if (unmounting)
return -ENOTCONN;
+ if (is_reserved_vino(vino))
+ return -ESTALE;
+
MetaRequest *req = new MetaRequest(CEPH_MDS_OP_LOOKUPINO);
- filepath path(ino);
+ filepath path(vino.ino);
req->set_filepath(path);
+ /*
+ * The MDS expects either a "real" snapid here or 0. The special value
+ * carveouts for the snapid are all at the end of the range so we can
+ * just look for any snapid below this value.
+ */
+ if (vino.snapid < CEPH_NOSNAP)
+ req->head.args.lookupino.snapid = vino.snapid;
+
int r = make_request(req, perms, NULL, NULL, rand() % mdsmap->get_num_in_mds());
if (r == 0 && inode != NULL) {
- vinodeno_t vino(ino, CEPH_NOSNAP);
unordered_map<vinodeno_t,Inode*>::iterator p = inode_map.find(vino);
ceph_assert(p != inode_map.end());
*inode = p->second;
_ll_get(*inode);
}
- ldout(cct, 8) << __func__ << " exit(" << ino << ") = " << r << dendl;
+ ldout(cct, 8) << __func__ << " exit(" << vino << ") = " << r << dendl;
return r;
}
int Client::lookup_ino(inodeno_t ino, const UserPerm& perms, Inode **inode)
{
+ vinodeno_t vino(ino, CEPH_NOSNAP);
std::lock_guard lock(client_lock);
- return _lookup_ino(ino, perms, inode);
+ return _lookup_vino(vino, perms, inode);
}
/**
Fh *Client::_create_fh(Inode *in, int flags, int cmode, const UserPerm& perms)
{
ceph_assert(in);
- Fh *f = new Fh(in, flags, cmode, perms);
+ Fh *f = new Fh(in, flags, cmode, fd_gen, perms);
ldout(cct, 10) << __func__ << " " << in->ino << " mode " << cmode << dendl;
if (cmode & CEPH_FILE_MODE_RD)
need |= CEPH_CAP_FILE_RD;
- result = get_caps(in, need, want, &have, -1);
+ Fh fh(in, flags, cmode, fd_gen, perms);
+ result = get_caps(&fh, need, want, &have, -1);
if (result < 0) {
ldout(cct, 8) << "Unable to get caps after open of inode " << *in <<
" . Denying open: " <<
cpp_strerror(result) << dendl;
- in->put_open_ref(cmode);
} else {
put_cap_ref(in, need);
}
loff_t Client::_lseek(Fh *f, loff_t offset, int whence)
{
Inode *in = f->inode.get();
- int r;
+ bool whence_check = false;
loff_t pos = -1;
- if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
- r = _getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms);
- if (r < 0) {
+ switch (whence) {
+ case SEEK_END:
+ whence_check = true;
+ break;
+
+#ifdef SEEK_DATA
+ case SEEK_DATA:
+ whence_check = true;
+ break;
+#endif
+
+#ifdef SEEK_HOLE
+ case SEEK_HOLE:
+ whence_check = true;
+ break;
+#endif
+ }
+
+ if (whence_check) {
+ int r = _getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms);
+ if (r < 0)
return r;
- }
}
switch (whence) {
pos = in->size + offset;
break;
+#ifdef SEEK_DATA
case SEEK_DATA:
- if (offset < 0 || offset >= in->size) {
- r = -ENXIO;
- return offset;
- }
+ if (offset < 0 || static_cast<uint64_t>(offset) >= in->size)
+ return -ENXIO;
pos = offset;
break;
+#endif
+#ifdef SEEK_HOLE
case SEEK_HOLE:
- if (offset < 0 || offset >= in->size) {
- r = -ENXIO;
- pos = offset;
- } else {
- pos = in->size;
- }
+ if (offset < 0 || static_cast<uint64_t>(offset) >= in->size)
+ return -ENXIO;
+ pos = in->size;
break;
+#endif
default:
ldout(cct, 1) << __func__ << ": invalid whence value " << whence << dendl;
ldout(cct, 10) << __func__ << " " << f << dendl;
if (f->pos_locked || !f->pos_waiters.empty()) {
- Cond cond;
+ ceph::condition_variable cond;
f->pos_waiters.push_back(&cond);
ldout(cct, 10) << __func__ << " BLOCKING on " << f << dendl;
- while (f->pos_locked || f->pos_waiters.front() != &cond)
- cond.Wait(client_lock);
+ std::unique_lock l{client_lock, std::adopt_lock};
+ cond.wait(l, [f, me=&cond] {
+ return !f->pos_locked && f->pos_waiters.front() == me;
+ });
+ l.release();
ldout(cct, 10) << __func__ << " UNBLOCKING on " << f << dendl;
ceph_assert(f->pos_waiters.front() == &cond);
f->pos_waiters.pop_front();
void Client::unlock_fh_pos(Fh *f)
{
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
+
ldout(cct, 10) << __func__ << " " << f << dendl;
f->pos_locked = false;
+ if (!f->pos_waiters.empty()) {
+ // only wake up the oldest waiter
+ auto cond = f->pos_waiters.front();
+ cond->notify_one();
+ }
}
int Client::uninline_data(Inode *in, Context *onfinish)
int Client::read(int fd, char *buf, loff_t size, loff_t offset)
{
- std::lock_guard lock(client_lock);
+ std::unique_lock lock(client_lock);
tout(cct) << "read" << std::endl;
tout(cct) << fd << std::endl;
tout(cct) << size << std::endl;
int r = _read(f, offset, size, &bl);
ldout(cct, 3) << "read(" << fd << ", " << (void*)buf << ", " << size << ", " << offset << ") = " << r << dendl;
if (r >= 0) {
- bl.copy(0, bl.length(), buf);
+ lock.unlock();
+ bl.begin().copy(bl.length(), buf);
r = bl.length();
}
return r;
int want, have = 0;
bool movepos = false;
std::unique_ptr<C_SaferCond> onuninline;
- int64_t r = 0;
+ int64_t rc = 0;
const auto& conf = cct->_conf;
Inode *in = f->inode.get();
utime_t lat;
loff_t start_pos = offset;
if (in->inline_version == 0) {
- r = _getattr(in, CEPH_STAT_CAP_INLINE_DATA, f->actor_perms, true);
+ auto r = _getattr(in, CEPH_STAT_CAP_INLINE_DATA, f->actor_perms, true);
if (r < 0) {
+ rc = r;
goto done;
}
ceph_assert(in->inline_version > 0);
want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_CACHE;
- r = get_caps(in, CEPH_CAP_FILE_RD, want, &have, -1);
- if (r < 0) {
- goto done;
+ {
+ auto r = get_caps(f, CEPH_CAP_FILE_RD, want, &have, -1);
+ if (r < 0) {
+ rc = r;
+ goto done;
+ }
}
if (f->flags & O_DIRECT)
have &= ~(CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO);
bl->substr_of(in->inline_data, offset, len - offset);
bl->append_zero(endoff - len);
}
- r = endoff - offset;
+ rc = endoff - offset;
} else if ((uint64_t)offset < endoff) {
bl->append_zero(endoff - offset);
- r = endoff - offset;
+ rc = endoff - offset;
} else {
- r = 0;
+ rc = 0;
}
goto success;
}
if (f->flags & O_RSYNC) {
_flush_range(in, offset, size);
}
- r = _read_async(f, offset, size, bl);
- if (r < 0)
+ rc = _read_async(f, offset, size, bl);
+ if (rc < 0)
goto done;
} else {
if (f->flags & O_DIRECT)
_flush_range(in, offset, size);
bool checkeof = false;
- r = _read_sync(f, offset, size, bl, &checkeof);
- if (r < 0)
+ rc = _read_sync(f, offset, size, bl, &checkeof);
+ if (rc < 0)
goto done;
if (checkeof) {
- offset += r;
- size -= r;
+ offset += rc;
+ size -= rc;
put_cap_ref(in, CEPH_CAP_FILE_RD);
have = 0;
// reverify size
- r = _getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms);
- if (r < 0)
- goto done;
+ {
+ auto r = _getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms);
+ if (r < 0) {
+ rc = r;
+ goto done;
+ }
+ }
// eof? short read.
if ((uint64_t)offset < in->size)
}
success:
- ceph_assert(r >= 0);
+ ceph_assert(rc >= 0);
if (movepos) {
// adjust fd pos
- f->pos = start_pos + r;
+ f->pos = start_pos + rc;
}
lat = ceph_clock_now();
// done!
if (onuninline) {
- client_lock.Unlock();
+ client_lock.unlock();
int ret = onuninline->wait();
- client_lock.Lock();
+ client_lock.lock();
if (ret >= 0 || ret == -ECANCELED) {
in->inline_data.clear();
in->inline_version = CEPH_INLINE_NONE;
in->mark_caps_dirty(CEPH_CAP_FILE_WR);
check_caps(in, 0);
} else
- r = ret;
+ rc = ret;
}
if (have) {
put_cap_ref(in, CEPH_CAP_FILE_RD);
if (movepos) {
unlock_fh_pos(f);
}
- return r;
+ return rc;
}
Client::C_Readahead::C_Readahead(Client *c, Fh *f) :
off, len, bl, 0, &onfinish);
if (r == 0) {
get_cap_ref(in, CEPH_CAP_FILE_CACHE);
- client_lock.Unlock();
+ client_lock.unlock();
r = onfinish.wait();
- client_lock.Lock();
+ client_lock.lock();
put_cap_ref(in, CEPH_CAP_FILE_CACHE);
}
ldout(cct, 10) << __func__ << " " << *in << " " << off << "~" << len << dendl;
- Mutex flock("Client::_read_sync flock");
- Cond cond;
while (left > 0) {
C_SaferCond onfinish("Client::_read_sync flock");
bufferlist tbl;
pos, left, &tbl, 0,
in->truncate_size, in->truncate_seq,
&onfinish);
- client_lock.Unlock();
+ client_lock.unlock();
int r = onfinish.wait();
- client_lock.Lock();
+ client_lock.lock();
// if we get ENOENT from OSD, assume 0 bytes returned
if (r == -ENOENT)
ldout(cct, 15) << __func__ << " unsafe_sync_write = " << unsafe_sync_write << dendl;
if (unsafe_sync_write == 0 && unmounting) {
ldout(cct, 10) << __func__ << " -- no more unsafe writes, unmount can proceed" << dendl;
- mount_cond.Signal();
+ mount_cond.notify_all();
}
}
if (r <= 0)
return r;
- int bufoff = 0;
+ auto iter = bl.cbegin();
for (unsigned j = 0, resid = r; j < iovcnt && resid > 0; j++) {
/*
* This piece of code aims to handle the case that bufferlist does not have enough data
* to fill in the iov
*/
- if (resid < iov[j].iov_len) {
- bl.copy(bufoff, resid, (char *)iov[j].iov_base);
- break;
- } else {
- bl.copy(bufoff, iov[j].iov_len, (char *)iov[j].iov_base);
- }
- resid -= iov[j].iov_len;
- bufoff += iov[j].iov_len;
+ const auto round_size = std::min<unsigned>(resid, iov[j].iov_len);
+ iter.copy(round_size, reinterpret_cast<char*>(iov[j].iov_base));
+ resid -= round_size;
+ /* iter is self-updating */
}
return r;
}
* change out from under us.
*/
if (f->flags & O_APPEND) {
- int r = _lseek(f, 0, SEEK_END);
+ auto r = _lseek(f, 0, SEEK_END);
if (r < 0) {
unlock_fh_pos(f);
return r;
want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_BUFFER;
- int r = get_caps(in, CEPH_CAP_FILE_WR|CEPH_CAP_AUTH_SHARED, want, &have, endoff);
+ int r = get_caps(f, CEPH_CAP_FILE_WR|CEPH_CAP_AUTH_SHARED, want, &have, endoff);
if (r < 0)
return r;
uint32_t len = in->inline_data.length();
if (endoff < len)
- in->inline_data.copy(endoff, len - endoff, bl);
+ in->inline_data.begin(endoff).copy(len - endoff, bl); // XXX
if (offset < len)
in->inline_data.splice(offset, len - offset);
offset, size, bl, ceph::real_clock::now(), 0,
in->truncate_size, in->truncate_seq,
&onfinish);
- client_lock.Unlock();
- onfinish.wait();
- client_lock.Lock();
+ client_lock.unlock();
+ r = onfinish.wait();
+ client_lock.lock();
_sync_write_commit(in);
+ if (r < 0)
+ goto done;
}
// if we get here, write was successful, update client metadata
done:
if (nullptr != onuninline) {
- client_lock.Unlock();
+ client_lock.unlock();
int uninline_ret = onuninline->wait();
- client_lock.Lock();
+ client_lock.lock();
if (uninline_ret >= 0 || uninline_ret == -ECANCELED) {
in->inline_data.clear();
if (f->flags & O_PATH)
return -EBADF;
#endif
+ if ((f->mode & CEPH_FILE_MODE_WR) == 0)
+ return -EBADF;
struct stat attr;
attr.st_size = length;
return _setattr(f->inode, &attr, CEPH_SETATTR_SIZE, perms);
}
if (nullptr != object_cacher_completion) { // wait on a real reply instead of guessing
- client_lock.Unlock();
+ client_lock.unlock();
ldout(cct, 15) << "waiting on data to flush" << dendl;
r = object_cacher_completion->wait();
- client_lock.Lock();
+ client_lock.lock();
ldout(cct, 15) << "got " << r << " from flush writeback" << dendl;
} else {
// FIXME: this can starve
objecter->get_fs_stats(stats, boost::optional<int64_t>(), &cond);
}
- client_lock.Unlock();
+ client_lock.unlock();
int rval = cond.wait();
assert(root);
total_files_on_fs = root->rstat.rfiles + root->rstat.rsubdirs;
- client_lock.Lock();
+ client_lock.lock();
if (rval < 0) {
ldout(cct, 1) << "underlying call to statfs returned error: "
<< " type " << fl->l_type << " owner " << owner
<< " " << fl->l_start << "~" << fl->l_len << dendl;
+ if (in->flags & I_ERROR_FILELOCK)
+ return -EIO;
+
int lock_cmd;
if (F_RDLCK == fl->l_type)
lock_cmd = CEPH_LOCK_SHARED;
Inode *in = fh->inode.get();
ldout(cct, 10) << __func__ << " " << fh << " ino " << in->ino << dendl;
+ list<ceph_filelock> activated_locks;
+
list<pair<int, ceph_filelock> > to_release;
if (fh->fcntl_locks) {
auto &lock_state = fh->fcntl_locks;
- for(multimap<uint64_t, ceph_filelock>::iterator p = lock_state->held_locks.begin();
- p != lock_state->held_locks.end();
- ++p)
- to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FCNTL, p->second));
+ for(auto p = lock_state->held_locks.begin(); p != lock_state->held_locks.end(); ) {
+ auto q = p++;
+ if (in->flags & I_ERROR_FILELOCK) {
+ lock_state->remove_lock(q->second, activated_locks);
+ } else {
+ to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FCNTL, q->second));
+ }
+ }
lock_state.reset();
}
if (fh->flock_locks) {
auto &lock_state = fh->flock_locks;
- for(multimap<uint64_t, ceph_filelock>::iterator p = lock_state->held_locks.begin();
- p != lock_state->held_locks.end();
- ++p)
- to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FLOCK, p->second));
+ for(auto p = lock_state->held_locks.begin(); p != lock_state->held_locks.end(); ) {
+ auto q = p++;
+ if (in->flags & I_ERROR_FILELOCK) {
+ lock_state->remove_lock(q->second, activated_locks);
+ } else {
+ to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FLOCK, q->second));
+ }
+ }
lock_state.reset();
}
- if (to_release.empty())
- return;
+ if ((in->flags & I_ERROR_FILELOCK) && !in->has_any_filelocks())
+ in->flags &= ~I_ERROR_FILELOCK;
- // mds has already released filelocks if session was closed.
- if (in->caps.empty())
+ if (to_release.empty())
return;
struct flock fl;
return statfs(0, stbuf, perms);
}
-void Client::ll_register_callbacks(struct client_callback_args *args)
+void Client::ll_register_callbacks(struct ceph_client_callback_args *args)
{
if (!args)
return;
remount_cb = args->remount_cb;
remount_finisher.start();
}
- umask_cb = args->umask_cb;
+ if (args->ino_release_cb) {
+ ino_release_cb = args->ino_release_cb;
+ async_ino_releasor.start();
+ }
+ if (args->umask_cb)
+ umask_cb = args->umask_cb;
}
int Client::test_dentry_handling(bool can_invalidate)
wait_sync_caps(flush_tid);
if (nullptr != cond) {
- client_lock.Unlock();
+ client_lock.unlock();
ldout(cct, 15) << __func__ << " waiting on data to flush" << dendl;
cond->wait();
ldout(cct, 15) << __func__ << " flush finished" << dendl;
- client_lock.Lock();
+ client_lock.lock();
}
return 0;
in->mtime = diri->mtime;
in->ctime = diri->ctime;
in->btime = diri->btime;
+ in->atime = diri->atime;
in->size = diri->size;
in->change_attr = diri->change_attr;
return r;
}
-int Client::ll_lookup_inode(
- struct inodeno_t ino,
+int Client::ll_lookup_vino(
+ vinodeno_t vino,
const UserPerm& perms,
Inode **inode)
{
ceph_assert(inode != NULL);
- std::lock_guard lock(client_lock);
- ldout(cct, 3) << "ll_lookup_inode " << ino << dendl;
-
+
if (unmounting)
return -ENOTCONN;
- // Num1: get inode and *inode
- int r = _lookup_ino(ino, perms, inode);
- if (r)
- return r;
-
- ceph_assert(*inode != NULL);
+ if (is_reserved_vino(vino))
+ return -ESTALE;
- if (!(*inode)->dentries.empty()) {
- ldout(cct, 8) << __func__ << " dentry already present" << dendl;
+ std::lock_guard lock(client_lock);
+ ldout(cct, 3) << __func__ << vino << dendl;
+
+ // Check the cache first
+ unordered_map<vinodeno_t,Inode*>::iterator p = inode_map.find(vino);
+ if (p != inode_map.end()) {
+ *inode = p->second;
+ _ll_get(*inode);
return 0;
}
- if ((*inode)->is_root()) {
- ldout(cct, 8) << "ino is root, no parent" << dendl;
- return 0;
- }
+ uint64_t snapid = vino.snapid;
- // Num2: Request the parent inode, so that we can look up the name
- Inode *parent;
- r = _lookup_parent(*inode, perms, &parent);
- if (r) {
- _ll_forget(*inode, 1);
+ // for snapdir, find the non-snapped dir inode
+ if (snapid == CEPH_SNAPDIR)
+ vino.snapid = CEPH_NOSNAP;
+
+ int r = _lookup_vino(vino, perms, inode);
+ if (r)
return r;
- }
+ ceph_assert(*inode != NULL);
- ceph_assert(parent != NULL);
+ if (snapid == CEPH_SNAPDIR) {
+ Inode *tmp = *inode;
- // Num3: Finally, get the name (dentry) of the requested inode
- r = _lookup_name(*inode, parent, perms);
- if (r) {
- // Unexpected error
- _ll_forget(parent, 1);
- _ll_forget(*inode, 1);
- return r;
+ // open the snapdir and put the inode ref
+ *inode = open_snapdir(tmp);
+ _ll_forget(tmp, 1);
+ _ll_get(*inode);
}
-
- _ll_forget(parent, 1);
return 0;
}
+int Client::ll_lookup_inode(
+ struct inodeno_t ino,
+ const UserPerm& perms,
+ Inode **inode)
+{
+ vinodeno_t vino(ino, CEPH_NOSNAP);
+ return ll_lookup_vino(vino, perms, inode);
+}
+
int Client::ll_lookupx(Inode *parent, const char *name, Inode **out,
struct ceph_statx *stx, unsigned want, unsigned flags,
const UserPerm& perms)
if (unmounting)
return NULL;
+ if (is_reserved_vino(vino))
+ return NULL;
+
unordered_map<vinodeno_t,Inode*>::iterator p = inode_map.find(vino);
if (p == inode_map.end())
return NULL;
if (vxattr->flags & VXATTR_RSTAT) {
flags |= CEPH_STAT_RSTAT;
}
+ if (vxattr->flags & VXATTR_DIRSTAT) {
+ flags |= CEPH_CAP_FILE_SHARED;
+ }
r = _getattr(in, flags, perms, true);
if (r != 0) {
// Error from getattr!
name += this_len;
size -= this_len;
}
-
- const VXattr *vxattr;
- for (vxattr = _get_vxattrs(in); vxattr && !vxattr->name.empty(); vxattr++) {
- if (vxattr->hidden)
- continue;
- // call pointer-to-member function
- if (vxattr->exists_cb && !(this->*(vxattr->exists_cb))(in))
- continue;
-
- size_t this_len = vxattr->name.length() + 1;
- r += this_len;
- if (len_only)
- continue;
-
- if (this_len > size) {
- r = -ERANGE;
- goto out;
- }
-
- memcpy(name, vxattr->name.c_str(), this_len);
- name += this_len;
- size -= this_len;
- }
out:
ldout(cct, 8) << __func__ << "(" << in->ino << ", " << size << ") = " << r << dendl;
return r;
return -EROFS;
}
+ if (size == 0) {
+ value = "";
+ } else if (value == NULL) {
+ return -EINVAL;
+ }
+
bool posix_acl_xattr = false;
if (acl_type == POSIX_ACL)
posix_acl_xattr = !strncmp(name, "system.", 7);
bool Client::_vxattrcb_quota_exists(Inode *in)
{
return in->quota.is_enable() &&
- in->snaprealm && in->snaprealm->ino == in->ino;
+ (in->snapid != CEPH_NOSNAP ||
+ (in->snaprealm && in->snaprealm->ino == in->ino));
}
size_t Client::_vxattrcb_quota(Inode *in, char *val, size_t size)
{
(long unsigned)in->snap_btime.nsec());
}
+size_t Client::_vxattrcb_cluster_fsid(Inode *in, char *val, size_t size)
+{
+ return snprintf(val, size, "%s", monclient->get_fsid().to_string().c_str());
+}
+
+size_t Client::_vxattrcb_client_id(Inode *in, char *val, size_t size)
+{
+ auto name = messenger->get_myname();
+ return snprintf(val, size, "%s%ld", name.type_str(), name.num());
+}
+
#define CEPH_XATTR_NAME(_type, _name) "ceph." #_type "." #_name
#define CEPH_XATTR_NAME2(_type, _name, _name2) "ceph." #_type "." #_name "." #_name2
-#define XATTR_NAME_CEPH(_type, _name) \
-{ \
- name: CEPH_XATTR_NAME(_type, _name), \
- getxattr_cb: &Client::_vxattrcb_ ## _type ## _ ## _name, \
- readonly: true, \
- hidden: false, \
- exists_cb: NULL, \
- flags: 0, \
-}
-#define XATTR_NAME_CEPH2(_type, _name, _flags) \
+#define XATTR_NAME_CEPH(_type, _name, _flags) \
{ \
name: CEPH_XATTR_NAME(_type, _name), \
getxattr_cb: &Client::_vxattrcb_ ## _type ## _ ## _name, \
readonly: true, \
- hidden: false, \
exists_cb: NULL, \
flags: _flags, \
}
name: CEPH_XATTR_NAME2(_type, _name, _field), \
getxattr_cb: &Client::_vxattrcb_ ## _name ## _ ## _field, \
readonly: false, \
- hidden: true, \
exists_cb: &Client::_vxattrcb_layout_exists, \
flags: 0, \
}
name: CEPH_XATTR_NAME(_type, _name), \
getxattr_cb: &Client::_vxattrcb_ ## _type ## _ ## _name, \
readonly: false, \
- hidden: true, \
exists_cb: &Client::_vxattrcb_quota_exists, \
flags: 0, \
}
name: "ceph.dir.layout",
getxattr_cb: &Client::_vxattrcb_layout,
readonly: false,
- hidden: true,
exists_cb: &Client::_vxattrcb_layout_exists,
flags: 0,
},
XATTR_LAYOUT_FIELD(dir, layout, object_size),
XATTR_LAYOUT_FIELD(dir, layout, pool),
XATTR_LAYOUT_FIELD(dir, layout, pool_namespace),
- XATTR_NAME_CEPH(dir, entries),
- XATTR_NAME_CEPH(dir, files),
- XATTR_NAME_CEPH(dir, subdirs),
- XATTR_NAME_CEPH2(dir, rentries, VXATTR_RSTAT),
- XATTR_NAME_CEPH2(dir, rfiles, VXATTR_RSTAT),
- XATTR_NAME_CEPH2(dir, rsubdirs, VXATTR_RSTAT),
- XATTR_NAME_CEPH2(dir, rbytes, VXATTR_RSTAT),
- XATTR_NAME_CEPH2(dir, rctime, VXATTR_RSTAT),
+ XATTR_NAME_CEPH(dir, entries, VXATTR_DIRSTAT),
+ XATTR_NAME_CEPH(dir, files, VXATTR_DIRSTAT),
+ XATTR_NAME_CEPH(dir, subdirs, VXATTR_DIRSTAT),
+ XATTR_NAME_CEPH(dir, rentries, VXATTR_RSTAT),
+ XATTR_NAME_CEPH(dir, rfiles, VXATTR_RSTAT),
+ XATTR_NAME_CEPH(dir, rsubdirs, VXATTR_RSTAT),
+ XATTR_NAME_CEPH(dir, rbytes, VXATTR_RSTAT),
+ XATTR_NAME_CEPH(dir, rctime, VXATTR_RSTAT),
{
name: "ceph.quota",
getxattr_cb: &Client::_vxattrcb_quota,
readonly: false,
- hidden: true,
exists_cb: &Client::_vxattrcb_quota_exists,
flags: 0,
},
name: "ceph.dir.pin",
getxattr_cb: &Client::_vxattrcb_dir_pin,
readonly: false,
- hidden: true,
exists_cb: &Client::_vxattrcb_dir_pin_exists,
flags: 0,
},
name: "ceph.snap.btime",
getxattr_cb: &Client::_vxattrcb_snap_btime,
readonly: true,
- hidden: false,
exists_cb: &Client::_vxattrcb_snap_btime_exists,
flags: 0,
},
name: "ceph.file.layout",
getxattr_cb: &Client::_vxattrcb_layout,
readonly: false,
- hidden: true,
exists_cb: &Client::_vxattrcb_layout_exists,
flags: 0,
},
name: "ceph.snap.btime",
getxattr_cb: &Client::_vxattrcb_snap_btime,
readonly: true,
- hidden: false,
exists_cb: &Client::_vxattrcb_snap_btime_exists,
flags: 0,
},
{ name: "" } /* Required table terminator */
};
+const Client::VXattr Client::_common_vxattrs[] = {
+ {
+ name: "ceph.cluster_fsid",
+ getxattr_cb: &Client::_vxattrcb_cluster_fsid,
+ readonly: true,
+ exists_cb: nullptr,
+ flags: 0,
+ },
+ {
+ name: "ceph.client_id",
+ getxattr_cb: &Client::_vxattrcb_client_id,
+ readonly: true,
+ exists_cb: nullptr,
+ flags: 0,
+ },
+ { name: "" } /* Required table terminator */
+};
+
const Client::VXattr *Client::_get_vxattrs(Inode *in)
{
if (in->is_dir())
vxattr++;
}
}
+
+ // for common vxattrs
+ vxattr = _common_vxattrs;
+ while (!vxattr->name.empty()) {
+ if (vxattr->name == name)
+ return vxattr;
+ vxattr++;
+ }
}
+
return NULL;
}
else
return -EROFS;
}
- if (fromdir != todir) {
- Inode *fromdir_root =
- fromdir->quota.is_enable() ? fromdir : get_quota_root(fromdir, perm);
- Inode *todir_root =
- todir->quota.is_enable() ? todir : get_quota_root(todir, perm);
- if (fromdir_root != todir_root) {
- return -EXDEV;
- }
- }
InodeRef target;
MetaRequest *req = new MetaRequest(op);
req->dentry_unless = CEPH_CAP_FILE_EXCL;
InodeRef oldin, otherin;
- res = _lookup(fromdir, fromname, 0, &oldin, perm);
+ Inode *fromdir_root = nullptr;
+ Inode *todir_root = nullptr;
+ int mask = 0;
+ bool quota_check = false;
+ if (fromdir != todir) {
+ fromdir_root =
+ fromdir->quota.is_enable() ? fromdir : get_quota_root(fromdir, perm);
+ todir_root =
+ todir->quota.is_enable() ? todir : get_quota_root(todir, perm);
+
+ if (todir_root->quota.is_enable() && fromdir_root != todir_root) {
+ // use CEPH_STAT_RSTAT mask to force send getattr or lookup request
+ // to auth MDS to get latest rstat for todir_root and source dir
+ // even if their dentry caches and inode caps are satisfied.
+ res = _getattr(todir_root, CEPH_STAT_RSTAT, perm, true);
+ if (res < 0)
+ goto fail;
+
+ quota_check = true;
+ if (oldde->inode && oldde->inode->is_dir()) {
+ mask |= CEPH_STAT_RSTAT;
+ }
+ }
+ }
+
+ res = _lookup(fromdir, fromname, mask, &oldin, perm);
if (res < 0)
goto fail;
req->set_old_inode(oldinode);
req->old_inode_drop = CEPH_CAP_LINK_SHARED;
+ if (quota_check) {
+ int64_t old_bytes, old_files;
+ if (oldinode->is_dir()) {
+ old_bytes = oldinode->rstat.rbytes;
+ old_files = oldinode->rstat.rsize();
+ } else {
+ old_bytes = oldinode->size;
+ old_files = 1;
+ }
+
+ bool quota_exceed = false;
+ if (todir_root && todir_root->quota.max_bytes &&
+ (old_bytes + todir_root->rstat.rbytes) >= todir_root->quota.max_bytes) {
+ ldout(cct, 10) << "_rename (" << oldinode->ino << " bytes="
+ << old_bytes << ") to (" << todir->ino
+ << ") will exceed quota on " << *todir_root << dendl;
+ quota_exceed = true;
+ }
+
+ if (todir_root && todir_root->quota.max_files &&
+ (old_files + todir_root->rstat.rsize()) >= todir_root->quota.max_files) {
+ ldout(cct, 10) << "_rename (" << oldinode->ino << " files="
+ << old_files << ") to (" << todir->ino
+ << ") will exceed quota on " << *todir_root << dendl;
+ quota_exceed = true;
+ }
+
+ if (quota_exceed) {
+ res = (oldinode->is_dir()) ? -EXDEV : -EDQUOT;
+ goto fail;
+ }
+ }
+
res = _lookup(todir, toname, 0, &otherin, perm);
switch (res) {
case 0:
/* We can't return bytes written larger than INT_MAX, clamp len to that */
len = std::min(len, (loff_t)INT_MAX);
- return _read(fh, off, len, bl);
+ int r = _read(fh, off, len, bl);
+ ldout(cct, 3) << "ll_read " << fh << " " << off << "~" << len << " = " << r
+ << dendl;
+ return r;
}
int Client::ll_read_block(Inode *in, uint64_t blockid,
CEPH_OSD_FLAG_READ,
&onfinish);
- client_lock.Unlock();
+ client_lock.unlock();
int r = onfinish.wait();
- client_lock.Lock();
+ client_lock.lock();
if (r >= 0) {
- bl.copy(0, bl.length(), buf);
+ bl.begin().copy(bl.length(), buf);
r = bl.length();
}
fakesnap.seq = snapseq;
/* lock just in time */
- client_lock.Lock();
+ client_lock.lock();
if (unmounting) {
- client_lock.Unlock();
+ client_lock.unlock();
return -ENOTCONN;
}
0,
onsafe.get());
- client_lock.Unlock();
+ client_lock.unlock();
if (nullptr != onsafe) {
r = onsafe->wait();
}
}
int have;
- int r = get_caps(in, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, &have, -1);
+ int r = get_caps(fh, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, &have, -1);
if (r < 0)
return r;
if (in->inline_version < CEPH_INLINE_NONE &&
(have & CEPH_CAP_FILE_BUFFER)) {
bufferlist bl;
+ auto inline_iter = in->inline_data.cbegin();
int len = in->inline_data.length();
if (offset < len) {
if (offset > 0)
- in->inline_data.copy(0, offset, bl);
+ inline_iter.copy(offset, bl);
int size = length;
if (offset + size > len)
size = len - offset;
if (size > 0)
bl.append_zero(size);
- if (offset + size < len)
- in->inline_data.copy(offset + size, len - offset - size, bl);
+ if (offset + size < len) {
+ inline_iter += size;
+ inline_iter.copy(len - offset - size, bl);
+ }
in->inline_data = bl;
in->inline_version++;
}
in->change_attr++;
in->mark_caps_dirty(CEPH_CAP_FILE_WR);
- client_lock.Unlock();
+ client_lock.unlock();
onfinish.wait();
- client_lock.Lock();
+ client_lock.lock();
_sync_write_commit(in);
}
} else if (!(mode & FALLOC_FL_KEEP_SIZE)) {
}
if (nullptr != onuninline) {
- client_lock.Unlock();
+ client_lock.unlock();
int ret = onuninline->wait();
- client_lock.Lock();
+ client_lock.lock();
if (ret >= 0 || ret == -ECANCELED) {
in->inline_data.clear();
case MetaSession::STATE_OPEN:
{
objecter->maybe_request_map(); /* to check if we are blacklisted */
- const auto& conf = cct->_conf;
- if (conf->client_reconnect_stale) {
+ if (cct->_conf.get_val<bool>("client_reconnect_stale")) {
ldout(cct, 1) << "reset from mds we were open; close mds session for reconnect" << dendl;
_closed_mds_session(s);
} else {
return false;
}
-bool Client::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer)
-{
- if (dest_type == CEPH_ENTITY_TYPE_MON)
- return true;
- *authorizer = monclient->build_authorizer(dest_type);
- return true;
-}
-
Inode *Client::get_quota_root(Inode *in, const UserPerm& perms)
{
Inode *quota_in = root_ancestor;
bool Client::is_quota_bytes_approaching(Inode *in, const UserPerm& perms)
{
+ ceph_assert(in->size >= in->reported_size);
+ const uint64_t size = in->size - in->reported_size;
return check_quota_condition(in, perms,
- [](const Inode &in) {
+ [&size](const Inode &in) {
if (in.quota.max_bytes) {
if (in.rstat.rbytes >= in.quota.max_bytes) {
return true;
}
- ceph_assert(in.size >= in.reported_size);
const uint64_t space = in.quota.max_bytes - in.rstat.rbytes;
- const uint64_t size = in.size - in.reported_size;
return (space >> 4) < size;
} else {
return false;
if (!cct->_conf->client_check_pool_perm)
return 0;
+ /* Only need to do this for regular files */
+ if (!in->is_file())
+ return 0;
+
int64_t pool_id = in->layout.pool_id;
std::string pool_ns = in->layout.pool_ns;
std::pair<int64_t, std::string> perm_key(pool_id, pool_ns);
objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), wr_op,
nullsnapc, ceph::real_clock::now(), 0, &wr_cond);
- client_lock.Unlock();
+ client_lock.unlock();
int rd_ret = rd_cond.wait();
int wr_ret = wr_cond.wait();
- client_lock.Lock();
+ client_lock.lock();
bool errored = false;
MetaSession *session;
if (!have_open_session(mds)) {
session = _get_or_open_mds_session(mds);
+ if (session->state == MetaSession::STATE_REJECTED)
+ return -EPERM;
if (session->state != MetaSession::STATE_OPENING) {
// umounting?
return -EINVAL;
}
ldout(cct, 10) << "waiting for session to mds." << mds << " to open" << dendl;
wait_on_context_list(session->waiting_for_open);
- if (rejected_by_mds.count(mds))
- return -EPERM;
continue;
}
if (session->reclaim_state == MetaSession::RECLAIM_NULL ||
session->reclaim_state == MetaSession::RECLAIMING) {
session->reclaim_state = MetaSession::RECLAIMING;
- auto m = MClientReclaim::create(uuid, flags);
+ auto m = make_message<MClientReclaim>(uuid, flags);
session->con->send_message2(std::move(m));
wait_on_list(waiting_for_reclaim);
} else if (session->reclaim_state == MetaSession::RECLAIM_FAIL) {
C_SaferCond cond;
if (!objecter->wait_for_map(reclaim_osd_epoch, &cond)) {
ldout(cct, 10) << __func__ << ": waiting for OSD epoch " << reclaim_osd_epoch << dendl;
- client_lock.Unlock();
+ client_lock.unlock();
cond.wait();
- client_lock.Lock();
+ client_lock.lock();
}
bool blacklisted = objecter->with_osdmap(
for (auto &p : mds_sessions) {
p.second.reclaim_state = MetaSession::RECLAIM_NULL;
- auto m = MClientReclaim::create("", MClientReclaim::FLAG_FINISH);
+ auto m = make_message<MClientReclaim>("", MClientReclaim::FLAG_FINISH);
p.second.con->send_message2(std::move(m));
}
mds_rank_t Client::_get_random_up_mds() const
{
- ceph_assert(client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
std::set<mds_rank_t> up;
mdsmap->get_up_mds_set(up);
StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc)
- : Client(m, mc, new Objecter(m->cct, m, mc, NULL, 0, 0))
+ : Client(m, mc, new Objecter(m->cct, m, mc, nullptr))
{
monclient->set_messenger(m);
objecter->set_client_incarnation(0);
int StandaloneClient::init()
{
- timer.init();
- objectcacher->start();
+ _pre_init();
objecter->init();
- client_lock.Lock();
+ client_lock.lock();
ceph_assert(!is_initialized());
messenger->add_dispatcher_tail(objecter);
if (r < 0) {
// need to do cleanup because we're in an intermediate init state
timer.shutdown();
- client_lock.Unlock();
+ client_lock.unlock();
objecter->shutdown();
objectcacher->stop();
monclient->shutdown();
}
objecter->start();
- client_lock.Unlock();
+ client_lock.unlock();
_finish_init();
return 0;