#include "Mutation.h"
#include "MetricsHandler.h"
#include "cephfs_features.h"
+#include "MDSContext.h"
#include "msg/Messenger.h"
#include "common/perf_counters.h"
#include "include/compat.h"
#include "osd/OSDMap.h"
+#include "fscrypt.h"
#include <errno.h>
#include "common/config.h"
+#include "msg/Message.h"
+
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mds
#undef dout_prefix
Server::Server(MDSRank *m, MetricsHandler *metrics_handler) :
mds(m),
mdcache(mds->mdcache), mdlog(mds->mdlog),
+ inject_rename_corrupt_dentry_first(g_conf().get_val<double>("mds_inject_rename_corrupt_dentry_first")),
recall_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate")),
metrics_handler(metrics_handler)
{
dir_max_entries = g_conf().get_val<uint64_t>("mds_dir_max_entries");
bal_fragment_size_max = g_conf().get_val<int64_t>("mds_bal_fragment_size_max");
supported_features = feature_bitset_t(CEPHFS_FEATURES_MDS_SUPPORTED);
+ supported_metric_spec = feature_bitset_t(CEPHFS_METRIC_FEATURES_ALL);
}
void Server::dispatch(const cref_t<Message> &m)
case CEPH_MSG_CLIENT_REQUEST:
handle_client_request(ref_cast<MClientRequest>(m));
return;
+ case CEPH_MSG_CLIENT_REPLY:
+ handle_client_reply(ref_cast<MClientReply>(m));
+ return;
case CEPH_MSG_CLIENT_RECLAIM:
handle_client_reclaim(ref_cast<MClientReclaim>(m));
return;
handle_peer_request(ref_cast<MMDSPeerRequest>(m));
return;
default:
- derr << "server unknown message " << m->get_type() << dendl;
- ceph_abort_msg("server unknown message");
+ derr << "Server unknown message " << m->get_type() << " from peer type " << m->get_connection()->get_peer_type() << dendl;
+ ceph_abort_msg("server unknown message " + to_string(m->get_type()) + " from peer type " + to_string(m->get_connection()->get_peer_type()));
}
}
unsigned flags = m->get_flags();
if (flags != CEPH_RECLAIM_RESET) { // currently only support reset
dout(10) << __func__ << " unsupported flags" << dendl;
- reply->set_result(-CEPHFS_EOPNOTSUPP);
+ reply->set_result(-CEPHFS_EINVAL);
mds->send_message_client(reply, session);
return;
}
if (flags & CEPH_RECLAIM_RESET) {
finish_reclaim_session(session, reply);
- return;
- }
-
- ceph_abort();
+ } else ceph_assert(0); /* no other flags are handled at this time */
}
void Server::finish_reclaim_session(Session *session, const ref_t<MClientReclaimReply> &reply)
void Server::handle_client_reclaim(const cref_t<MClientReclaim> &m)
{
Session *session = mds->get_session(m);
+ uint32_t flags = m->get_flags();
dout(3) << __func__ << " " << *m << " from " << m->get_source() << dendl;
- ceph_assert(m->get_source().is_client()); // should _not_ come from an mds!
+ ceph_assert(m->is_a_client()); // should _not_ come from an mds!
if (!session) {
dout(0) << " ignoring sessionless msg " << *m << dendl;
return;
}
- if (m->get_flags() & MClientReclaim::FLAG_FINISH) {
+ if (flags & MClientReclaim::FLAG_FINISH) {
+ if (flags ^ MClientReclaim::FLAG_FINISH) {
+ dout(0) << __func__ << " client specified FLAG_FINISH with other flags."
+ " Other flags:" << flags << dendl;
+ auto reply = make_message<MClientReclaimReply>(0);
+ reply->set_result(-CEPHFS_EINVAL);
+ mds->send_message_client(reply, session);
+ return;
+ }
finish_reclaim_session(session);
} else {
reclaim_session(session, m);
Session *session = mds->get_session(m);
dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl;
- ceph_assert(m->get_source().is_client()); // should _not_ come from an mds!
+ ceph_assert(m->is_a_client()); // should _not_ come from an mds!
if (!session) {
dout(0) << " ignoring sessionless msg " << *m << dendl;
uint64_t sseq = 0;
switch (m->get_op()) {
case CEPH_SESSION_REQUEST_OPEN:
+ if(mds->mdsmap->test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION)) {
+ dout(0) << "new sessions are not permitted, enable again via"
+ "`ceph fs set <fs_name> refuse_client_session false`" << dendl;
+ auto reply = make_message<MClientSession>(CEPH_SESSION_REJECT);
+ reply->metadata["error_string"] = "new sessions are not permitted,"
+ " enable again via `ceph fs set"
+ " <fs_name> refuse_client_session false`";
+ mds->send_message(reply, m->get_connection());
+ return;
+ }
if (session->is_opening() ||
session->is_open() ||
session->is_stale() ||
session->is_killing() ||
terminating_sessions) {
- dout(10) << "currently open|opening|stale|killing, dropping this req" << dendl;
+ if (m->supported_features.test(CEPHFS_FEATURE_NOTIFY_SESSION_STATE)) {
+ if (session->is_open() && !mds->is_stopping()) {
+ dout(10) << "currently already opened" << dendl;
+
+ auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN,
+ session->get_push_seq());
+ if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+ reply->supported_features = supported_features;
+ mds->send_message_client(reply, session);
+ if (mdcache->is_readonly()) {
+ auto m = make_message<MClientSession>(CEPH_SESSION_FORCE_RO);
+ mds->send_message_client(m, session);
+ }
+ }
+ }
+ dout(10) << "currently " << session->get_state_name()
+ << ", dropping this req" << dendl;
return;
}
ceph_assert(session->is_closed() || session->is_closing());
break;
default:
- ceph_abort();
+ auto m = make_message<MClientSession>(CEPH_SESSION_REJECT);
+ mds->send_message_client(m, session);
+ derr << "Server received unknown message " << m->get_type() << ", closing session and blocklisting the client " << session->get_client() << dendl;
+ CachedStackStringStream css;
+ mds->evict_client(session->get_client().v, false, true, *css, nullptr);
}
}
metrics_handler->add_session(session);
ceph_assert(session->get_connection());
auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
- if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+ if (session->info.has_feature(CEPHFS_FEATURE_MIMIC)) {
reply->supported_features = supported_features;
+ reply->metric_spec = supported_metric_spec;
+ }
mds->send_message_client(reply, session);
if (mdcache->is_readonly()) {
auto m = make_message<MClientSession>(CEPH_SESSION_FORCE_RO);
metrics_handler->add_session(session);
auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
- if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+ if (session->info.has_feature(CEPHFS_FEATURE_MIMIC)) {
reply->supported_features = supported_features;
+ reply->metric_spec = supported_metric_spec;
+ }
mds->send_message_client(reply, session);
if (mdcache->is_readonly())
if (mds->locker->revoke_stale_caps(session)) {
mds->locker->remove_stale_leases(session);
finish_flush_session(session, session->get_push_seq());
- auto m = make_message<MClientSession>(CEPH_SESSION_STALE, session->get_push_seq());
+ auto m = make_message<MClientSession>(CEPH_SESSION_STALE);
mds->send_message_client(m, session);
} else {
to_evict.push_back(session);
if (changed.count("mds_alternate_name_max")) {
alternate_name_max = g_conf().get_val<Option::size_t>("mds_alternate_name_max");
}
+ if (changed.count("mds_fscrypt_last_block_max_size")) {
+ fscrypt_last_block_max_size = g_conf().get_val<Option::size_t>("mds_fscrypt_last_block_max_size");
+ }
if (changed.count("mds_dir_max_entries")) {
dir_max_entries = g_conf().get_val<uint64_t>("mds_dir_max_entries");
dout(20) << __func__ << " max entries per directory changed to "
dout(20) << __func__ << " max fragment size changed to "
<< bal_fragment_size_max << dendl;
}
+ if (changed.count("mds_inject_rename_corrupt_dentry_first")) {
+ inject_rename_corrupt_dentry_first = g_conf().get_val<double>("mds_inject_rename_corrupt_dentry_first");
+ }
}
/*
}
}
-size_t Server::apply_blocklist(const std::set<entity_addr_t> &blocklist)
+size_t Server::apply_blocklist()
{
- bool prenautilus = mds->objecter->with_osdmap(
- [&](const OSDMap& o) {
- return o.require_osd_release < ceph_release_t::nautilus;
- });
-
std::vector<Session*> victims;
const auto& sessions = mds->sessionmap.get_sessions();
- for (const auto& p : sessions) {
- if (!p.first.is_client()) {
- // Do not apply OSDMap blocklist to MDS daemons, we find out
- // about their death via MDSMap.
- continue;
- }
-
- Session *s = p.second;
- auto inst_addr = s->info.inst.addr;
- // blocklist entries are always TYPE_ANY for nautilus+
- inst_addr.set_type(entity_addr_t::TYPE_ANY);
- if (blocklist.count(inst_addr)) {
- victims.push_back(s);
- continue;
- }
- if (prenautilus) {
- // ...except pre-nautilus, they were TYPE_LEGACY
- inst_addr.set_type(entity_addr_t::TYPE_LEGACY);
- if (blocklist.count(inst_addr)) {
- victims.push_back(s);
+ mds->objecter->with_osdmap(
+ [&](const OSDMap& o) {
+ for (const auto& p : sessions) {
+ if (!p.first.is_client()) {
+ // Do not apply OSDMap blocklist to MDS daemons, we find out
+ // about their death via MDSMap.
+ continue;
+ }
+ if (o.is_blocklisted(p.second->info.inst.addr)) {
+ victims.push_back(p.second);
+ }
}
- }
- }
+ });
for (const auto& s : victims) {
kill_session(s, nullptr);
return;
}
+ if(mds->mdsmap->test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION)) {
+ mds->clog->warn() << "client could not reconnect as"
+ " file system flag refuse_client_session is set";
+ dout(0) << "client cannot reconnect when file system flag"
+ " refuse_client_session is set" << dendl;
+ auto reply = make_message<MClientSession>(CEPH_SESSION_CLOSE);
+ reply->metadata["error_string"] = "client cannot reconnect when file system flag"
+ " refuse_client_session is set";
+ mds->send_message(reply, m->get_connection());
+ return;
+ }
+
if (!session->is_open()) {
dout(0) << " ignoring msg from not-open session" << *m << dendl;
auto reply = make_message<MClientSession>(CEPH_SESSION_CLOSE);
metrics_handler->add_session(session);
// notify client of success with an OPEN
auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
- if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+ if (session->info.has_feature(CEPHFS_FEATURE_MIMIC)) {
reply->supported_features = supported_features;
+ reply->metric_spec = supported_metric_spec;
+ }
mds->send_message_client(reply, session);
mds->clog->debug() << "reconnect by " << session->info.inst << " after " << delay;
}
mdr->pin(dn);
early_reply(mdr, in, dn);
-
+
mdr->committing = true;
submit_mdlog_entry(le, fin, mdr, __func__);
-
+
if (mdr->client_request && mdr->client_request->is_queued_for_replay()) {
if (mds->queue_one_replay()) {
dout(10) << " queued next replay op" << dendl;
} else {
dout(10) << " journaled last replay op" << dendl;
}
- } else if (mdr->did_early_reply)
+ } else if (mdr->did_early_reply) {
mds->locker->drop_rdlocks_for_early_reply(mdr.get());
- else
+ if (dn && dn->is_waiter_for(CDentry::WAIT_UNLINK_FINISH))
+ mdlog->flush();
+ } else {
mdlog->flush();
+ }
}
void Server::submit_mdlog_entry(LogEvent *le, MDSLogContextBase *fin, MDRequestRef& mdr,
mds->logger->inc(l_mds_reply);
utime_t lat = ceph_clock_now() - req->get_recv_stamp();
mds->logger->tinc(l_mds_reply_latency, lat);
+ if (lat >= g_conf()->mds_op_complaint_time) {
+ mds->logger->inc(l_mds_slow_reply);
+ }
if (client_inst.name.is_client()) {
mds->sessionmap.hit_session(mdr->session);
}
mds->logger->inc(l_mds_reply);
utime_t lat = ceph_clock_now() - mdr->client_request->get_recv_stamp();
mds->logger->tinc(l_mds_reply_latency, lat);
+ if (lat >= g_conf()->mds_op_complaint_time) {
+ mds->logger->inc(l_mds_slow_reply);
+ }
if (session && client_inst.name.is_client()) {
mds->sessionmap.hit_session(session);
}
mds->send_message_client(reply, session);
}
+ if (client_inst.name.is_mds() && reply->get_op() == CEPH_MDS_OP_RENAME) {
+ mds->send_message(reply, mdr->client_request->get_connection());
+ }
+
if (req->is_queued_for_replay() &&
(mdr->has_completed || reply->get_result() < 0)) {
if (reply->get_result() < 0) {
realm = in->find_snaprealm();
else
realm = dn->get_dir()->get_inode()->find_snaprealm();
- reply->snapbl = realm->get_snap_trace();
+ reply->snapbl = get_snap_trace(session, realm);
dout(10) << "set_trace_dist snaprealm " << *realm << " len=" << reply->snapbl.length() << dendl;
}
dout(20) << "set_trace_dist added dir " << *dir << dendl;
encode(dn->get_name(), bl);
-
- int lease_mask = 0;
- CDentry::linkage_t *dnl = dn->get_linkage(mdr->get_client(), mdr);
- if (dnl->is_primary()) {
- ceph_assert(dnl->get_inode() == in);
- lease_mask = CEPH_LEASE_PRIMARY_LINK;
- } else {
- if (dnl->is_remote())
- ceph_assert(dnl->get_remote_ino() == in->ino());
- else
- ceph_assert(!in);
- }
- mds->locker->issue_client_lease(dn, mdr, lease_mask, now, bl);
- dout(20) << "set_trace_dist added dn " << snapid << " " << *dn << dendl;
+ mds->locker->issue_client_lease(dn, in, mdr, now, bl);
} else
reply->head.is_dentry = 0;
bool sessionclosed_isok = replay_unsafe_with_closed_session;
// active session?
Session *session = 0;
- if (req->get_source().is_client()) {
+ if (req->is_a_client()) {
session = mds->get_session(req);
if (!session) {
dout(5) << "no session for " << req->get_source() << ", dropping" << dendl;
// process embedded cap releases?
// (only if NOT replay!)
- if (!req->releases.empty() && req->get_source().is_client() && !req->is_replay()) {
+ if (!req->releases.empty() && req->is_a_client() && !req->is_replay()) {
client_t client = req->get_source().num();
for (const auto &r : req->releases) {
mds->locker->process_request_cap_release(mdr, client, r.item, r.dname);
return;
}
+void Server::handle_client_reply(const cref_t<MClientReply> &reply)
+{
+ dout(4) << "handle_client_reply " << *reply << dendl;
+
+ ceph_assert(reply->is_safe());
+ ceph_tid_t tid = reply->get_tid();
+
+ if (mds->internal_client_requests.count(tid) == 0) {
+ dout(1) << " no pending request on tid " << tid << dendl;
+ return;
+ }
+
+ auto &req = mds->internal_client_requests.at(tid);
+ CDentry *dn = req.get_dentry();
+
+ switch (reply->get_op()) {
+ case CEPH_MDS_OP_RENAME:
+ if (dn) {
+ dn->state_clear(CDentry::STATE_REINTEGRATING);
+
+ MDSContext::vec finished;
+ dn->take_waiting(CDentry::WAIT_REINTEGRATE_FINISH, finished);
+ mds->queue_waiters(finished);
+ }
+ break;
+ default:
+ dout(5) << " unknown client op " << reply->get_op() << dendl;
+ }
+
+ mds->internal_client_requests.erase(tid);
+}
+
void Server::handle_osd_map()
{
/* Note that we check the OSDMAP_FULL flag directly rather than
CDentry *straydn = NULL;
if (m->straybl.length() > 0) {
- mdcache->decode_replica_stray(straydn, m->straybl, from);
+ mdcache->decode_replica_stray(straydn, nullptr, m->straybl, from);
ceph_assert(straydn);
m->straybl.clear();
}
break;
default:
- ceph_abort();
+ ceph_abort_msg("unknown op " + to_string(m->get_op()) + " requested");
}
}
if (!lock) {
dout(10) << "don't have object, dropping" << dendl;
- ceph_abort(); // can this happen, if we auth pinned properly.
+ ceph_abort_msg("don't have object"); // can this happen, if we auth pinned properly.
}
if (op == MMDSPeerRequest::OP_XLOCK && !lock->get_parent()->is_auth()) {
dout(10) << "not auth for remote xlock attempt, dropping on "
break;
default:
- ceph_abort();
+ ceph_abort_msg("unknown op "+ to_string(op)+ " received");
}
}
// while session is opening.
bool allow_prealloc_inos = mdr->session->is_open();
+ inodeno_t _useino = useino;
+
// assign ino
- if (allow_prealloc_inos && (mdr->used_prealloc_ino = _inode->ino = mdr->session->take_ino(useino))) {
- mds->sessionmap.mark_projected(mdr->session);
- dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
- << " (" << mdr->session->info.prealloc_inos.size() << " left)"
- << dendl;
- } else {
- mdr->alloc_ino =
- _inode->ino = mds->inotable->project_alloc_id(useino);
- dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino << dendl;
- }
+ do {
+ if (allow_prealloc_inos && (mdr->used_prealloc_ino = _inode->ino = mdr->session->take_ino(_useino))) {
+ if (mdcache->test_and_clear_taken_inos(_inode->ino)) {
+ _inode->ino = 0;
+ dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
+ << " (" << mdr->session->info.prealloc_inos.size() << " left)"
+ << " but has been taken, will try again!" << dendl;
+ } else {
+ mds->sessionmap.mark_projected(mdr->session);
+ dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
+ << " (" << mdr->session->info.prealloc_inos.size() << " left)"
+ << dendl;
+ }
+ } else {
+ mdr->alloc_ino =
+ _inode->ino = mds->inotable->project_alloc_id(_useino);
+ if (mdcache->test_and_clear_taken_inos(_inode->ino)) {
+ mds->inotable->apply_alloc_id(_inode->ino);
+ _inode->ino = 0;
+ dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino
+ << " but has been taken, will try again!" << dendl;
+ } else {
+ dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino << dendl;
+ }
+ }
+ _useino = 0;
+ } while (!_inode->ino);
if (useino && useino != _inode->ino) {
dout(0) << "WARNING: client specified " << useino << " and i allocated " << _inode->ino << dendl;
<< " but mds." << mds->get_nodeid() << " allocated " << _inode->ino;
//ceph_abort(); // just for now.
}
-
+
if (allow_prealloc_inos &&
mdr->session->get_num_projected_prealloc_inos() < g_conf()->mds_client_prealloc_inos / 2) {
int need = g_conf()->mds_client_prealloc_inos - mdr->session->get_num_projected_prealloc_inos();
_inode->truncate_seq = 1; /* starting with 1, 0 is kept for no-truncation logic */
CInode *diri = dir->get_inode();
+ auto pip = diri->get_projected_inode();
- dout(10) << oct << " dir mode 0" << diri->get_inode()->mode << " new mode 0" << mode << dec << dendl;
+ dout(10) << oct << " dir mode 0" << pip->mode << " new mode 0" << mode << dec << dendl;
- if (diri->get_inode()->mode & S_ISGID) {
+ if (pip->mode & S_ISGID) {
dout(10) << " dir is sticky" << dendl;
- _inode->gid = diri->get_inode()->gid;
+ _inode->gid = pip->gid;
if (S_ISDIR(mode)) {
- dout(10) << " new dir also sticky" << dendl;
+ dout(10) << " new dir also sticky" << dendl;
_inode->mode |= S_ISGID;
}
- } else
+ } else {
_inode->gid = mdr->client_request->get_caller_gid();
+ }
_inode->uid = mdr->client_request->get_caller_uid();
_inode->change_attr = 0;
const cref_t<MClientRequest> &req = mdr->client_request;
+
+ dout(10) << "copying fscrypt_auth len " << req->fscrypt_auth.size() << dendl;
+ _inode->fscrypt_auth = req->fscrypt_auth;
+ _inode->fscrypt_file = req->fscrypt_file;
+
if (req->get_data().length()) {
auto p = req->get_data().cbegin();
auto _xattrs = CInode::allocate_xattr_map();
decode_noshare(*_xattrs, p);
dout(10) << "prepare_new_inode setting xattrs " << *_xattrs << dendl;
- if (_xattrs->count("encryption.ctx")) {
- _inode->fscrypt = true;
- }
in->reset_xattrs(std::move(_xattrs));
}
}
}
+struct C_MDS_TryOpenInode : public ServerContext {
+ MDRequestRef mdr;
+ inodeno_t ino;
+ C_MDS_TryOpenInode(Server *s, MDRequestRef& r, inodeno_t i) :
+ ServerContext(s), mdr(r), ino(i) {}
+ void finish(int r) override {
+ server->_try_open_ino(mdr, r, ino);
+ }
+};
+
+void Server::_try_open_ino(MDRequestRef& mdr, int r, inodeno_t ino)
+{
+ dout(10) << "_try_open_ino " << mdr.get() << " ino " << ino << " r=" << r << dendl;
+
+ // `r` is a rank if >=0, else an error code
+ if (r >= 0) {
+ mds_rank_t dest_rank(r);
+ if (dest_rank == mds->get_nodeid())
+ dispatch_client_request(mdr);
+ else
+ mdcache->request_forward(mdr, dest_rank);
+ return;
+ }
+
+ // give up
+ if (r == -CEPHFS_ENOENT || r == -CEPHFS_ENODATA)
+ r = -CEPHFS_ESTALE;
+ respond_to_request(mdr, r);
+}
+
class C_MDS_TryFindInode : public ServerContext {
MDRequestRef mdr;
+ MDCache *mdcache;
+ inodeno_t ino;
public:
- C_MDS_TryFindInode(Server *s, MDRequestRef& r) : ServerContext(s), mdr(r) {}
+ C_MDS_TryFindInode(Server *s, MDRequestRef& r, MDCache *m, inodeno_t i) :
+ ServerContext(s), mdr(r), mdcache(m), ino(i) {}
void finish(int r) override {
- if (r == -CEPHFS_ESTALE) // :( find_ino_peers failed
- server->respond_to_request(mdr, r);
- else
+ if (r == -CEPHFS_ESTALE) { // :( find_ino_peers failed
+ /*
+ * There has one case that when the MDS crashes and the
+ * openfiletable journal couldn't be flushed and then
+ * the replacing MDS is possibly won't load some already
+ * opened CInodes into the MDCache. And if the clients
+ * will retry some requests after reconnected, the MDS
+ * will return -ESTALE after failing to find the ino in
+ * all active peers.
+ *
+ * As a workaround users can run `ls -R ${mountpoint}`
+ * to list all the sub-files or sub-direcotries from the
+ * mountpoint.
+ *
+ * We need try to open the ino and try it again.
+ */
+ CInode *in = mdcache->get_inode(ino);
+ if (in && in->state_test(CInode::STATE_PURGING))
+ server->respond_to_request(mdr, r);
+ else
+ mdcache->open_ino(ino, (int64_t)-1, new C_MDS_TryOpenInode(server, mdr, ino));
+ } else {
server->dispatch_client_request(mdr);
+ }
}
};
respond_to_request(mdr, r);
} else if (r == -CEPHFS_ESTALE) {
dout(10) << "FAIL on CEPHFS_ESTALE but attempting recovery" << dendl;
- MDSContext *c = new C_MDS_TryFindInode(this, mdr);
- mdcache->find_ino_peers(refpath.get_ino(), c);
+ inodeno_t ino = refpath.get_ino();
+ mdcache->find_ino_peers(ino, new C_MDS_TryFindInode(this, mdr, mdcache, ino));
} else {
dout(10) << "FAIL on error " << r << dendl;
respond_to_request(mdr, r);
/** rdlock_path_xlock_dentry
* traverse path to the directory that could/would contain dentry.
- * make sure i am auth for that dentry, forward as necessary.
- * create null dentry in place (or use existing if okexist).
+ * make sure i am auth for that dentry (or target inode if it exists and authexist),
+ * forward as necessary. create null dentry in place (or use existing if okexist).
* get rdlocks on traversed dentries, xlock on new dentry.
+ *
+ * set authexist true if caller requires the target inode to be auth when it exists.
+ * the tail dentry is not always auth any more if authexist because it is impossible
+ * to ensure tail dentry and target inode are both auth in one mds. the tail dentry
+ * will not be xlocked too if authexist and the target inode exists.
*/
CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
- bool create, bool okexist, bool want_layout)
+ bool create, bool okexist, bool authexist,
+ bool want_layout)
{
const filepath& refpath = mdr->get_filepath();
dout(10) << "rdlock_path_xlock_dentry " << *mdr << " " << refpath << dendl;
flags |= MDS_TRAVERSE_CHECK_LOCKCACHE;
if (create)
flags |= MDS_TRAVERSE_RDLOCK_AUTHLOCK;
+ if (authexist)
+ flags |= MDS_TRAVERSE_WANT_INODE;
if (want_layout)
flags |= MDS_TRAVERSE_WANT_DIRLAYOUT;
int r = mdcache->path_traverse(mdr, cf, refpath, flags, &mdr->dn[0]);
if (r < 0) {
if (r == -CEPHFS_ESTALE) {
dout(10) << "FAIL on CEPHFS_ESTALE but attempting recovery" << dendl;
- mdcache->find_ino_peers(refpath.get_ino(), new C_MDS_TryFindInode(this, mdr));
+ inodeno_t ino = refpath.get_ino();
+ mdcache->find_ino_peers(ino, new C_MDS_TryFindInode(this, mdr, mdcache, ino));
return nullptr;
}
respond_to_request(mdr, r);
CInode *diri = dir->get_inode();
if (!mdr->reqid.name.is_mds()) {
- if (diri->is_system() && !diri->is_root()) {
+ if (diri->is_system() && !diri->is_root() &&
+ (!diri->is_lost_and_found() ||
+ mdr->client_request->get_op() != CEPH_MDS_OP_UNLINK)) {
respond_to_request(mdr, -CEPHFS_EROFS);
return nullptr;
}
if (r != 0) {
if (r == -CEPHFS_ESTALE) {
dout(10) << "CEPHFS_ESTALE on path, attempting recovery" << dendl;
- mdcache->find_ino_peers(refpath.get_ino(), new C_MDS_TryFindInode(this, mdr));
+ inodeno_t ino = refpath.get_ino();
+ mdcache->find_ino_peers(ino, new C_MDS_TryFindInode(this, mdr, mdcache, ino));
} else if (r < 0) {
respond_to_request(mdr, r);
}
if (r != 0) {
if (r == -CEPHFS_ESTALE) {
dout(10) << "CEPHFS_ESTALE on path2, attempting recovery" << dendl;
- mdcache->find_ino_peers(refpath2.get_ino(), new C_MDS_TryFindInode(this, mdr));
+ inodeno_t ino = refpath2.get_ino();
+ mdcache->find_ino_peers(ino, new C_MDS_TryFindInode(this, mdr, mdcache, ino));
} else if (r < 0) {
respond_to_request(mdr, r);
}
if (!ref)
return;
- mdr->getattr_caps = mask;
-
/*
* if client currently holds the EXCL cap on a field, do not rdlock
* it; client's stat() will result in valid info if _either_ EXCL
// value for them. (currently this matters for xattrs and inline data)
mdr->getattr_caps = mask;
- mds->balancer->hit_inode(ref, META_POP_IRD, req->get_source().num());
+ mds->balancer->hit_inode(ref, META_POP_IRD);
// reply
dout(10) << "reply to stat on " << *req << dendl;
if (cmode & CEPH_FILE_MODE_WR)
mds->balancer->hit_inode(cur, META_POP_IWR);
else
- mds->balancer->hit_inode(cur, META_POP_IRD,
- mdr->client_request->get_source().num());
+ mds->balancer->hit_inode(cur, META_POP_IRD);
CDentry *dn = 0;
if (req->get_dentry_wanted()) {
void finish(int r) override {
ceph_assert(r == 0);
+ // crash current MDS and the replacing MDS will test the journal
+ ceph_assert(!g_conf()->mds_kill_skip_replaying_inotable);
+
dn->pop_projected_linkage();
// dirty inode, dn, dir
}
bool excl = req->head.args.open.flags & CEPH_O_EXCL;
- CDentry *dn = rdlock_path_xlock_dentry(mdr, true, !excl, true);
+ CDentry *dn = rdlock_path_xlock_dentry(mdr, true, !excl, true, true);
if (!dn)
return;
+ if (is_unlink_pending(dn)) {
+ wait_for_pending_unlink(dn, mdr);
+ return;
+ }
+
CDentry::linkage_t *dnl = dn->get_projected_linkage();
if (!excl && !dnl->is_null()) {
// it existed.
- mds->locker->xlock_downgrade(&dn->lock, mdr.get());
+ ceph_assert(mdr.get()->is_rdlocked(&dn->lock));
MutationImpl::LockOpVec lov;
lov.add_rdlock(&dnl->get_inode()->snaplock);
// this isn't perfect, but we should capture the main variable/unbounded size items!
int front_bytes = dirbl.length() + sizeof(__u32) + sizeof(__u8)*2;
int bytes_left = max_bytes - front_bytes;
- bytes_left -= realm->get_snap_trace().length();
+ bytes_left -= get_snap_trace(session, realm).length();
// build dir contents
bufferlist dnbl;
bool dnp = dn->use_projected(client, mdr);
CDentry::linkage_t *dnl = dnp ? dn->get_projected_linkage() : dn->get_linkage();
- if (dnl->is_null())
+ if (dnl->is_null()) {
+ if (dn->get_num_ref() == 0 && !dn->is_projected())
+ dir->remove_dentry(dn);
continue;
+ }
if (dn->last < snapid || dn->first > snapid) {
dout(20) << "skipping non-overlapping snap " << *dn << dendl;
// dentry
dout(12) << "including dn " << *dn << dendl;
encode(dn->get_name(), dnbl);
- int lease_mask = dnl->is_primary() ? CEPH_LEASE_PRIMARY_LINK : 0;
- mds->locker->issue_client_lease(dn, mdr, lease_mask, now, dnbl);
+ mds->locker->issue_client_lease(dn, in, mdr, now, dnbl);
// inode
dout(12) << "including inode " << *in << dendl;
mdr->reply_extra_bl = dirbl;
// bump popularity. NOTE: this doesn't quite capture it.
- mds->balancer->hit_dir(dir, META_POP_READDIR, -1, numfiles);
+ mds->balancer->hit_dir(dir, META_POP_READDIR, numfiles);
// reply
mdr->tracei = diri;
__u32 mask = req->head.args.setattr.mask;
__u32 access_mask = MAY_WRITE;
+ if (req->get_header().version < 6) {
+ // No changes to fscrypted inodes by downrevved clients
+ if (!cur->get_inode()->fscrypt_auth.empty()) {
+ respond_to_request(mdr, -CEPHFS_EPERM);
+ return;
+ }
+
+ // Only allow fscrypt field changes by capable clients
+ if (mask & (CEPH_SETATTR_FSCRYPT_FILE|CEPH_SETATTR_FSCRYPT_AUTH)) {
+ respond_to_request(mdr, -CEPHFS_EINVAL);
+ return;
+ }
+ }
+
// xlock inode
- if (mask & (CEPH_SETATTR_MODE|CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_BTIME|CEPH_SETATTR_KILL_SGUID))
+ if (mask & (CEPH_SETATTR_MODE|CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_BTIME|CEPH_SETATTR_KILL_SGUID|CEPH_SETATTR_FSCRYPT_AUTH|CEPH_SETATTR_KILL_SUID|CEPH_SETATTR_KILL_SGID))
lov.add_xlock(&cur->authlock);
- if (mask & (CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME|CEPH_SETATTR_SIZE))
+ if (mask & (CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME|CEPH_SETATTR_SIZE|CEPH_SETATTR_FSCRYPT_FILE))
lov.add_xlock(&cur->filelock);
if (mask & CEPH_SETATTR_CTIME)
lov.add_wrlock(&cur->versionlock);
bool truncating_smaller = false;
if (mask & CEPH_SETATTR_SIZE) {
- truncating_smaller = req->head.args.setattr.size < old_size;
+ if (req->get_data().length() >
+ sizeof(struct ceph_fscrypt_last_block_header) + fscrypt_last_block_max_size) {
+ dout(10) << __func__ << ": the last block size is too large" << dendl;
+ respond_to_request(mdr, -CEPHFS_EINVAL);
+ return;
+ }
+
+ truncating_smaller = req->head.args.setattr.size < old_size ||
+ (req->head.args.setattr.size == old_size && req->get_data().length());
if (truncating_smaller && pip->is_truncating()) {
dout(10) << " waiting for pending truncate from " << pip->truncate_from
<< " to " << pip->truncate_size << " to complete on " << *cur << dendl;
cur->add_waiter(CInode::WAIT_TRUNC, new C_MDS_RetryRequest(mdcache, mdr));
return;
}
+
+ if (truncating_smaller && req->get_data().length()) {
+ struct ceph_fscrypt_last_block_header header;
+ memset(&header, 0, sizeof(header));
+ auto bl = req->get_data().cbegin();
+ DECODE_START(1, bl);
+ decode(header.change_attr, bl);
+ DECODE_FINISH(bl);
+
+ dout(20) << __func__ << " mdr->retry:" << mdr->retry
+ << " header.change_attr: " << header.change_attr
+ << " header.file_offset: " << header.file_offset
+ << " header.block_size: " << header.block_size
+ << dendl;
+
+ if (header.change_attr != pip->change_attr) {
+ dout(5) << __func__ << ": header.change_attr:" << header.change_attr
+ << " != current change_attr:" << pip->change_attr
+ << ", let client retry it!" << dendl;
+ // flush the journal to make sure the clients will get the lasted
+ // change_attr as possible for the next retry
+ mds->mdlog->flush();
+ respond_to_request(mdr, -CEPHFS_EAGAIN);
+ return;
+ }
+ }
}
bool changed_ranges = false;
if (mask & CEPH_SETATTR_MODE)
pi.inode->mode = (pi.inode->mode & ~07777) | (req->head.args.setattr.mode & 07777);
- else if ((mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID)) &&
- S_ISREG(pi.inode->mode) &&
- (pi.inode->mode & (S_IXUSR|S_IXGRP|S_IXOTH))) {
- pi.inode->mode &= ~(S_ISUID|S_ISGID);
+ else if ((mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID|
+ CEPH_SETATTR_KILL_SUID|CEPH_SETATTR_KILL_SGID)) &&
+ S_ISREG(pi.inode->mode)) {
+ if (mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID) &&
+ (pi.inode->mode & (S_IXUSR|S_IXGRP|S_IXOTH))) {
+ pi.inode->mode &= ~(S_ISUID|S_ISGID);
+ } else {
+ if (mask & CEPH_SETATTR_KILL_SUID) {
+ pi.inode->mode &= ~S_ISUID;
+ }
+ if (mask & CEPH_SETATTR_KILL_SGID) {
+ pi.inode->mode &= ~S_ISGID;
+ }
+ }
}
if (mask & CEPH_SETATTR_MTIME)
pi.inode->time_warp_seq++; // maybe not a timewarp, but still a serialization point.
if (mask & CEPH_SETATTR_SIZE) {
if (truncating_smaller) {
- pi.inode->truncate(old_size, req->head.args.setattr.size);
+ pi.inode->truncate(old_size, req->head.args.setattr.size, req->get_data());
le->metablob.add_truncate_start(cur->ino());
} else {
pi.inode->size = req->head.args.setattr.size;
}
}
+ if (mask & CEPH_SETATTR_FSCRYPT_AUTH)
+ pi.inode->fscrypt_auth = req->fscrypt_auth;
+ if (mask & CEPH_SETATTR_FSCRYPT_FILE)
+ pi.inode->fscrypt_file = req->fscrypt_file;
+
pi.inode->version = cur->pre_dirty();
pi.inode->ctime = mdr->get_op_stamp();
if (mdr->get_op_stamp() > pi.inode->rstat.rctime)
void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
{
const cref_t<MClientRequest> &req = mdr->client_request;
+ MutationImpl::LockOpVec lov;
string name(req->get_path2());
bufferlist bl = req->get_data();
string value (bl.c_str(), bl.length());
if (!xlock_policylock(mdr, cur, true))
return;
+ /* We need 'As' caps for the fscrypt context */
+ lov.add_xlock(&cur->authlock);
+ if (!mds->locker->acquire_locks(mdr, lov)) {
+ return;
+ }
+
+ /* encrypted directories can't have their layout changed */
+ if (!cur->get_inode()->fscrypt_auth.empty()) {
+ respond_to_request(mdr, -CEPHFS_EINVAL);
+ return;
+ }
+
file_layout_t layout;
if (cur->get_projected_inode()->has_layout())
layout = cur->get_projected_inode()->layout;
if (check_layout_vxattr(mdr, rest, value, &layout) < 0)
return;
- MutationImpl::LockOpVec lov;
lov.add_xlock(&cur->filelock);
if (!mds->locker->acquire_locks(mdr, lov))
return;
+ /* encrypted files can't have their layout changed */
+ if (!cur->get_inode()->fscrypt_auth.empty()) {
+ respond_to_request(mdr, -CEPHFS_EINVAL);
+ return;
+ }
+
auto pi = cur->project_inode(mdr);
int64_t old_pool = pi.inode->layout.pool_id;
pi.inode->add_old_pool(old_pool);
return;
}
- if (quota.is_enable() && !cur->get_projected_srnode())
+ if (quota.is_enabled() && !cur->get_projected_srnode())
adjust_realm = true;
if (!xlock_policylock(mdr, cur, false, adjust_realm))
*/
if (!mdr->more()->rdonly_checks) {
if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
- MutationImpl::LockOpVec lov;
lov.add_rdlock(&cur->snaplock);
if (!mds->locker->acquire_locks(mdr, lov))
return;
pi.inode->ctime = mdr->get_op_stamp();
if (mdr->get_op_stamp() > pi.inode->rstat.rctime)
pi.inode->rstat.rctime = mdr->get_op_stamp();
- if (name == "encryption.ctx"sv)
- pi.inode->fscrypt = true;
pi.inode->change_attr++;
pi.inode->xattr_version++;
// ------------------------------------------------
+struct C_WaitUnlinkToFinish : public MDSContext {
+protected:
+ MDCache *mdcache;
+ CDentry *dn;
+ MDSContext *fin;
+
+ MDSRank *get_mds() override
+ {
+ ceph_assert(mdcache != NULL);
+ return mdcache->mds;
+ }
+
+public:
+ C_WaitUnlinkToFinish(MDCache *m, CDentry *d, MDSContext *f) :
+ mdcache(m), dn(d), fin(f) {}
+ void finish(int r) override {
+ fin->complete(r);
+ dn->put(CDentry::PIN_PURGING);
+ }
+};
+
+bool Server::is_unlink_pending(CDentry *dn)
+{
+ CDentry::linkage_t *dnl = dn->get_projected_linkage();
+ if (!dnl->is_null() && dn->state_test(CDentry::STATE_UNLINKING)) {
+ return true;
+ }
+ return false;
+}
+
+void Server::wait_for_pending_unlink(CDentry *dn, MDRequestRef& mdr)
+{
+ dout(20) << __func__ << " dn " << *dn << dendl;
+ mds->locker->drop_locks(mdr.get());
+ auto fin = new C_MDS_RetryRequest(mdcache, mdr);
+ dn->get(CDentry::PIN_PURGING);
+ dn->add_waiter(CDentry::WAIT_UNLINK_FINISH, new C_WaitUnlinkToFinish(mdcache, dn, fin));
+}
+
+struct C_WaitReintegrateToFinish : public MDSContext {
+protected:
+ MDCache *mdcache;
+ CDentry *dn;
+ MDSContext *fin;
+
+ MDSRank *get_mds() override
+ {
+ ceph_assert(mdcache != NULL);
+ return mdcache->mds;
+ }
+
+public:
+ C_WaitReintegrateToFinish(MDCache *m, CDentry *d, MDSContext *f) :
+ mdcache(m), dn(d), fin(f) {}
+ void finish(int r) override {
+ fin->complete(r);
+ dn->put(CDentry::PIN_PURGING);
+ }
+};
+
+bool Server::is_reintegrate_pending(CDentry *dn)
+{
+ CDentry::linkage_t *dnl = dn->get_projected_linkage();
+ if (!dnl->is_null() && dn->state_test(CDentry::STATE_REINTEGRATING)) {
+ return true;
+ }
+ return false;
+}
+
+void Server::wait_for_pending_reintegrate(CDentry *dn, MDRequestRef& mdr)
+{
+ dout(20) << __func__ << " dn " << *dn << dendl;
+ mds->locker->drop_locks(mdr.get());
+ auto fin = new C_MDS_RetryRequest(mdcache, mdr);
+ dn->get(CDentry::PIN_PURGING);
+ dn->add_waiter(CDentry::WAIT_REINTEGRATE_FINISH, new C_WaitReintegrateToFinish(mdcache, dn, fin));
+}
+
// MKNOD
class C_MDS_mknod_finish : public ServerLogContext {
void finish(int r) override {
ceph_assert(r == 0);
+ // crash current MDS and the replacing MDS will test the journal
+ ceph_assert(!g_conf()->mds_kill_skip_replaying_inotable);
+
// link the inode
dn->pop_projected_linkage();
mode |= S_IFREG;
mdr->disable_lock_cache();
- CDentry *dn = rdlock_path_xlock_dentry(mdr, true, false, S_ISREG(mode));
+ CDentry *dn = rdlock_path_xlock_dentry(mdr, true, false, false, S_ISREG(mode));
if (!dn)
return;
+ if (is_unlink_pending(dn)) {
+ wait_for_pending_unlink(dn, mdr);
+ return;
+ }
+
CDir *dir = dn->get_dir();
CInode *diri = dir->get_inode();
if (!check_access(mdr, diri, MAY_WRITE))
if (!dn)
return;
+ if (is_unlink_pending(dn)) {
+ wait_for_pending_unlink(dn, mdr);
+ return;
+ }
+
CDir *dir = dn->get_dir();
CInode *diri = dir->get_inode();
if (!dn)
return;
+ if (is_unlink_pending(dn)) {
+ wait_for_pending_unlink(dn, mdr);
+ return;
+ }
+
CDir *dir = dn->get_dir();
CInode *diri = dir->get_inode();
journal_and_reply(mdr, newi, dn, le, new C_MDS_mknod_finish(this, mdr, dn, newi));
mds->balancer->maybe_fragment(dir, false);
+
+ // flush the journal as soon as possible
+ if (g_conf()->mds_kill_skip_replaying_inotable) {
+ mdlog->flush();
+ }
}
targeti = mdcache->get_inode(req->get_filepath2().get_ino());
if (!targeti) {
dout(10) << "CEPHFS_ESTALE on path2, attempting recovery" << dendl;
- mdcache->find_ino_peers(req->get_filepath2().get_ino(), new C_MDS_TryFindInode(this, mdr));
+ inodeno_t ino = req->get_filepath2().get_ino();
+ mdcache->find_ino_peers(ino, new C_MDS_TryFindInode(this, mdr, mdcache, ino));
return;
}
mdr->pin(targeti);
targeti = ret.second->get_projected_linkage()->get_inode();
}
+ if (is_unlink_pending(destdn)) {
+ wait_for_pending_unlink(destdn, mdr);
+ return;
+ }
+
ceph_assert(destdn->get_projected_linkage()->is_null());
if (req->get_alternate_name().size() > alternate_name_max) {
dout(10) << " alternate_name longer than " << alternate_name_max << dendl;
if (target_pin != dir->inode &&
target_realm->get_subvolume_ino() !=
dir->inode->find_snaprealm()->get_subvolume_ino()) {
+ if (target_pin->is_stray()) {
+ mds->locker->drop_locks(mdr.get());
+ targeti->add_waiter(CInode::WAIT_UNLINK,
+ new C_MDS_RetryRequest(mdcache, mdr));
+ mdlog->flush();
+ return;
+ }
dout(7) << "target is in different subvolume, failing..." << dendl;
respond_to_request(mdr, -CEPHFS_EXDEV);
return;
mdr->apply();
MDRequestRef null_ref;
- if (inc)
+ if (inc) {
mdcache->send_dentry_link(dn, null_ref);
- else
+ } else {
+ dn->state_clear(CDentry::STATE_UNLINKING);
mdcache->send_dentry_unlink(dn, NULL, null_ref);
-
+
+ MDSContext::vec finished;
+ dn->take_waiting(CDentry::WAIT_UNLINK_FINISH, finished);
+ mdcache->mds->queue_waiters(finished);
+ }
+
// bump target popularity
mds->balancer->hit_inode(targeti, META_POP_IWR);
mds->balancer->hit_dir(dn->get_dir(), META_POP_IWR);
if (rmdir)
mdr->disable_lock_cache();
+
CDentry *dn = rdlock_path_xlock_dentry(mdr, false, true);
if (!dn)
return;
+ if (is_reintegrate_pending(dn)) {
+ wait_for_pending_reintegrate(dn, mdr);
+ return;
+ }
+
+ // notify replica MDSes the dentry is under unlink
+ if (!dn->state_test(CDentry::STATE_UNLINKING)) {
+ dn->state_set(CDentry::STATE_UNLINKING);
+ mdcache->send_dentry_unlink(dn, nullptr, mdr, true);
+ if (dn->replica_unlinking_ref) {
+ return;
+ }
+ }
+
CDentry::linkage_t *dnl = dn->get_linkage(client, mdr);
ceph_assert(!dnl->is_null());
CInode *in = dnl->get_inode();
if (rmdir) {
// do empty directory checks
if (_dir_is_nonempty_unlocked(mdr, in)) {
- respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
+ dn->state_clear(CDentry::STATE_UNLINKING);
+ respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
return;
}
} else {
dout(7) << "handle_client_unlink on dir " << *in << ", returning error" << dendl;
+ dn->state_clear(CDentry::STATE_UNLINKING);
respond_to_request(mdr, -CEPHFS_EISDIR);
return;
}
if (rmdir) {
// unlink
dout(7) << "handle_client_rmdir on non-dir " << *in << ", returning error" << dendl;
+ dn->state_clear(CDentry::STATE_UNLINKING);
respond_to_request(mdr, -CEPHFS_ENOTDIR);
return;
}
CInode *diri = dn->get_dir()->get_inode();
if ((!mdr->has_more() || mdr->more()->witnessed.empty())) {
- if (!check_access(mdr, diri, MAY_WRITE))
+ if (!check_access(mdr, diri, MAY_WRITE)) {
+ dn->state_clear(CDentry::STATE_UNLINKING);
return;
+ }
}
// -- create stray dentry? --
if (in->is_dir() &&
_dir_is_nonempty(mdr, in)) {
respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
+ dn->state_clear(CDentry::STATE_UNLINKING);
return;
}
}
mdr->apply();
-
+
+ dn->state_clear(CDentry::STATE_UNLINKING);
mdcache->send_dentry_unlink(dn, straydn, mdr);
-
+
+ MDSContext::vec finished;
+ dn->take_waiting(CDentry::WAIT_UNLINK_FINISH, finished);
+ mdcache->mds->queue_waiters(finished);
+
if (straydn) {
// update subtree map?
if (strayin->is_dir())
// reply
respond_to_request(mdr, 0);
-
+
// removing a new dn?
dn->get_dir()->try_remove_unlinked_dn(dn);
if (!destdn)
return;
+ if (is_unlink_pending(destdn)) {
+ wait_for_pending_unlink(destdn, mdr);
+ return;
+ }
+
+ if (is_unlink_pending(srcdn)) {
+ wait_for_pending_unlink(srcdn, mdr);
+ return;
+ }
+
dout(10) << " destdn " << *destdn << dendl;
CDir *destdir = destdn->get_dir();
ceph_assert(destdir->is_auth());
C_MDS_rename_finish *fin = new C_MDS_rename_finish(this, mdr, srcdn, destdn, straydn);
journal_and_reply(mdr, srci, destdn, le, fin);
+
+ // trigger to flush mdlog in case reintegrating or migrating the stray dn,
+ // because the link requests maybe waiting.
+ if (srcdn->get_dir()->inode->is_stray()) {
+ mdlog->flush();
+ }
mds->balancer->maybe_fragment(destdn->get_dir(), false);
}
mdcache->journal_cow_dentry(mdr.get(), metablob, destdn, CEPH_NOSNAP, 0, destdnl);
destdn->first = mdcache->get_global_snaprealm()->get_newest_seq() + 1;
+ {
+ auto do_corruption = inject_rename_corrupt_dentry_first;
+ if (unlikely(do_corruption > 0.0)) {
+ auto r = ceph::util::generate_random_number(0.0, 1.0);
+ if (r < do_corruption) {
+ dout(0) << "corrupting dn: " << *destdn << dendl;
+ destdn->first = -10;
+ }
+ }
+ }
if (destdn->is_auth())
metablob->add_primary_dentry(destdn, srci, true, true);
srcdn->get_dir()->unlink_inode(srcdn);
+ // After the stray dn being unlinked from the corresponding inode in case of
+ // reintegrate_stray/migrate_stray, just wake up the waitiers.
+ MDSContext::vec finished;
+ in->take_waiting(CInode::WAIT_UNLINK, finished);
+ if (!finished.empty()) {
+ mds->queue_waiters(finished);
+ }
+
// dest
if (srcdn_was_remote) {
if (!linkmerge) {
return;
}
if (snapname.length() == 0 ||
+ snapname.length() > snapshot_name_max ||
snapname[0] == '_') {
respond_to_request(mdr, -CEPHFS_EINVAL);
return;
em.first->second = info;
newsnap.seq = snapid;
newsnap.last_created = snapid;
+ newsnap.last_modified = info.stamp;
+ newsnap.change_attr++;
// journal the inode changes
mdr->ls = mdlog->get_current_segment();
}
snapid_t snapid = diri->snaprealm->resolve_snapname(snapname, diri->ino());
dout(10) << " snapname " << snapname << " is " << snapid << dendl;
-
if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
MutationImpl::LockOpVec lov;
lov.add_xlock(&diri->snaplock);
newnode.snaps.erase(snapid);
newnode.seq = seq;
newnode.last_destroyed = seq;
+ newnode.last_modified = mdr->get_op_stamp();
+ newnode.change_attr++;
le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
le->metablob.add_table_transaction(TABLE_SNAP, stid);
{
dout(10) << "_rmsnap_finish " << *mdr << " " << snapid << dendl;
snapid_t stid = mdr->more()->stid;
- auto p = mdr->more()->snapidbl.cbegin();
- snapid_t seq;
- decode(seq, p);
mdr->apply();
// yay
mdr->in[0] = diri;
+ mdr->tracei = diri;
+ mdr->snapid = snapid;
respond_to_request(mdr, 0);
// purge snapshot data
auto it = newsnap.snaps.find(snapid);
ceph_assert(it != newsnap.snaps.end());
it->second.name = dstname;
+ newsnap.last_modified = mdr->get_op_stamp();
+ newsnap.change_attr++;
// journal the inode changes
mdr->ls = mdlog->get_current_segment();
f->dump_stream("client_reconnect_gather") << client_reconnect_gather;
f->close_section();
}
+
+const bufferlist& Server::get_snap_trace(Session *session, SnapRealm *realm) const {
+ ceph_assert(session);
+ ceph_assert(realm);
+ if (session->info.has_feature(CEPHFS_FEATURE_NEW_SNAPREALM_INFO)) {
+ return realm->get_snap_trace_new();
+ } else {
+ return realm->get_snap_trace();
+ }
+}
+
+const bufferlist& Server::get_snap_trace(client_t client, SnapRealm *realm) const {
+ Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(client.v));
+ return get_snap_trace(session, realm);
+}