#include "Mutation.h"
#include "MetricsHandler.h"
#include "cephfs_features.h"
+#include "MDSContext.h"
#include "msg/Messenger.h"
#include "common/perf_counters.h"
#include "include/compat.h"
#include "osd/OSDMap.h"
+#include "fscrypt.h"
#include <errno.h>
Server::Server(MDSRank *m, MetricsHandler *metrics_handler) :
mds(m),
mdcache(mds->mdcache), mdlog(mds->mdlog),
+ inject_rename_corrupt_dentry_first(g_conf().get_val<double>("mds_inject_rename_corrupt_dentry_first")),
recall_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate")),
metrics_handler(metrics_handler)
{
case CEPH_MSG_CLIENT_REQUEST:
handle_client_request(ref_cast<MClientRequest>(m));
return;
+ case CEPH_MSG_CLIENT_REPLY:
+ handle_client_reply(ref_cast<MClientReply>(m));
+ return;
case CEPH_MSG_CLIENT_RECLAIM:
handle_client_reclaim(ref_cast<MClientReclaim>(m));
return;
uint64_t sseq = 0;
switch (m->get_op()) {
case CEPH_SESSION_REQUEST_OPEN:
+ if(mds->mdsmap->test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION)) {
+ dout(0) << "new sessions are not permitted, enable again via"
+ "`ceph fs set <fs_name> refuse_client_session false`" << dendl;
+ auto reply = make_message<MClientSession>(CEPH_SESSION_REJECT);
+ reply->metadata["error_string"] = "new sessions are not permitted,"
+ " enable again via `ceph fs set"
+ " <fs_name> refuse_client_session false`";
+ mds->send_message(reply, m->get_connection());
+ return;
+ }
if (session->is_opening() ||
session->is_open() ||
session->is_stale() ||
if (mds->locker->revoke_stale_caps(session)) {
mds->locker->remove_stale_leases(session);
finish_flush_session(session, session->get_push_seq());
- auto m = make_message<MClientSession>(CEPH_SESSION_STALE, session->get_push_seq());
+ auto m = make_message<MClientSession>(CEPH_SESSION_STALE);
mds->send_message_client(m, session);
} else {
to_evict.push_back(session);
if (changed.count("mds_alternate_name_max")) {
alternate_name_max = g_conf().get_val<Option::size_t>("mds_alternate_name_max");
}
+ if (changed.count("mds_fscrypt_last_block_max_size")) {
+ fscrypt_last_block_max_size = g_conf().get_val<Option::size_t>("mds_fscrypt_last_block_max_size");
+ }
if (changed.count("mds_dir_max_entries")) {
dir_max_entries = g_conf().get_val<uint64_t>("mds_dir_max_entries");
dout(20) << __func__ << " max entries per directory changed to "
dout(20) << __func__ << " max fragment size changed to "
<< bal_fragment_size_max << dendl;
}
+ if (changed.count("mds_inject_rename_corrupt_dentry_first")) {
+ inject_rename_corrupt_dentry_first = g_conf().get_val<double>("mds_inject_rename_corrupt_dentry_first");
+ }
}
/*
return;
}
+ if(mds->mdsmap->test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION)) {
+ mds->clog->warn() << "client could not reconnect as"
+ " file system flag refuse_client_session is set";
+ dout(0) << "client cannot reconnect when file system flag"
+ " refuse_client_session is set" << dendl;
+ auto reply = make_message<MClientSession>(CEPH_SESSION_CLOSE);
+ reply->metadata["error_string"] = "client cannot reconnect when file system flag"
+ " refuse_client_session is set";
+ mds->send_message(reply, m->get_connection());
+ return;
+ }
+
if (!session->is_open()) {
dout(0) << " ignoring msg from not-open session" << *m << dendl;
auto reply = make_message<MClientSession>(CEPH_SESSION_CLOSE);
mds->send_message_client(reply, session);
}
+ if (client_inst.name.is_mds() && reply->get_op() == CEPH_MDS_OP_RENAME) {
+ mds->send_message(reply, mdr->client_request->get_connection());
+ }
+
if (req->is_queued_for_replay() &&
(mdr->has_completed || reply->get_result() < 0)) {
if (reply->get_result() < 0) {
realm = in->find_snaprealm();
else
realm = dn->get_dir()->get_inode()->find_snaprealm();
- reply->snapbl = realm->get_snap_trace();
+ reply->snapbl = get_snap_trace(session, realm);
dout(10) << "set_trace_dist snaprealm " << *realm << " len=" << reply->snapbl.length() << dendl;
}
return;
}
+void Server::handle_client_reply(const cref_t<MClientReply> &reply)
+{
+ dout(4) << "handle_client_reply " << *reply << dendl;
+
+ ceph_assert(reply->is_safe());
+ ceph_tid_t tid = reply->get_tid();
+
+ if (mds->internal_client_requests.count(tid) == 0) {
+ dout(1) << " no pending request on tid " << tid << dendl;
+ return;
+ }
+
+ auto &req = mds->internal_client_requests.at(tid);
+ CDentry *dn = req.get_dentry();
+
+ switch (reply->get_op()) {
+ case CEPH_MDS_OP_RENAME:
+ if (dn) {
+ dn->state_clear(CDentry::STATE_REINTEGRATING);
+
+ MDSContext::vec finished;
+ dn->take_waiting(CDentry::WAIT_REINTEGRATE_FINISH, finished);
+ mds->queue_waiters(finished);
+ }
+ break;
+ default:
+ dout(5) << " unknown client op " << reply->get_op() << dendl;
+ }
+
+ mds->internal_client_requests.erase(tid);
+}
+
void Server::handle_osd_map()
{
/* Note that we check the OSDMAP_FULL flag directly rather than
// while session is opening.
bool allow_prealloc_inos = mdr->session->is_open();
+ inodeno_t _useino = useino;
+
// assign ino
- if (allow_prealloc_inos && (mdr->used_prealloc_ino = _inode->ino = mdr->session->take_ino(useino))) {
- mds->sessionmap.mark_projected(mdr->session);
- dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
- << " (" << mdr->session->info.prealloc_inos.size() << " left)"
- << dendl;
- } else {
- mdr->alloc_ino =
- _inode->ino = mds->inotable->project_alloc_id(useino);
- dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino << dendl;
- }
+ do {
+ if (allow_prealloc_inos && (mdr->used_prealloc_ino = _inode->ino = mdr->session->take_ino(_useino))) {
+ if (mdcache->test_and_clear_taken_inos(_inode->ino)) {
+ _inode->ino = 0;
+ dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
+ << " (" << mdr->session->info.prealloc_inos.size() << " left)"
+ << " but has been taken, will try again!" << dendl;
+ } else {
+ mds->sessionmap.mark_projected(mdr->session);
+ dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
+ << " (" << mdr->session->info.prealloc_inos.size() << " left)"
+ << dendl;
+ }
+ } else {
+ mdr->alloc_ino =
+ _inode->ino = mds->inotable->project_alloc_id(_useino);
+ if (mdcache->test_and_clear_taken_inos(_inode->ino)) {
+ mds->inotable->apply_alloc_id(_inode->ino);
+ _inode->ino = 0;
+ dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino
+ << " but has been taken, will try again!" << dendl;
+ } else {
+ dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino << dendl;
+ }
+ }
+ _useino = 0;
+ } while (!_inode->ino);
if (useino && useino != _inode->ino) {
dout(0) << "WARNING: client specified " << useino << " and i allocated " << _inode->ino << dendl;
<< " but mds." << mds->get_nodeid() << " allocated " << _inode->ino;
//ceph_abort(); // just for now.
}
-
+
if (allow_prealloc_inos &&
mdr->session->get_num_projected_prealloc_inos() < g_conf()->mds_client_prealloc_inos / 2) {
int need = g_conf()->mds_client_prealloc_inos - mdr->session->get_num_projected_prealloc_inos();
_inode->change_attr = 0;
const cref_t<MClientRequest> &req = mdr->client_request;
+
+ dout(10) << "copying fscrypt_auth len " << req->fscrypt_auth.size() << dendl;
+ _inode->fscrypt_auth = req->fscrypt_auth;
+ _inode->fscrypt_file = req->fscrypt_file;
+
if (req->get_data().length()) {
auto p = req->get_data().cbegin();
auto _xattrs = CInode::allocate_xattr_map();
decode_noshare(*_xattrs, p);
dout(10) << "prepare_new_inode setting xattrs " << *_xattrs << dendl;
- if (_xattrs->count("encryption.ctx")) {
- _inode->fscrypt = true;
- }
in->reset_xattrs(std::move(_xattrs));
}
/** rdlock_path_xlock_dentry
* traverse path to the directory that could/would contain dentry.
- * make sure i am auth for that dentry, forward as necessary.
- * create null dentry in place (or use existing if okexist).
+ * make sure i am auth for that dentry (or target inode if it exists and authexist),
+ * forward as necessary. create null dentry in place (or use existing if okexist).
* get rdlocks on traversed dentries, xlock on new dentry.
+ *
+ * set authexist true if caller requires the target inode to be auth when it exists.
+ * the tail dentry is not always auth any more if authexist because it is impossible
+ * to ensure tail dentry and target inode are both auth in one mds. the tail dentry
+ * will not be xlocked too if authexist and the target inode exists.
*/
CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
- bool create, bool okexist, bool want_layout)
+ bool create, bool okexist, bool authexist,
+ bool want_layout)
{
const filepath& refpath = mdr->get_filepath();
dout(10) << "rdlock_path_xlock_dentry " << *mdr << " " << refpath << dendl;
flags |= MDS_TRAVERSE_CHECK_LOCKCACHE;
if (create)
flags |= MDS_TRAVERSE_RDLOCK_AUTHLOCK;
+ if (authexist)
+ flags |= MDS_TRAVERSE_WANT_INODE;
if (want_layout)
flags |= MDS_TRAVERSE_WANT_DIRLAYOUT;
int r = mdcache->path_traverse(mdr, cf, refpath, flags, &mdr->dn[0]);
CInode *diri = dir->get_inode();
if (!mdr->reqid.name.is_mds()) {
- if (diri->is_system() && !diri->is_root()) {
+ if (diri->is_system() && !diri->is_root() &&
+ (!diri->is_lost_and_found() ||
+ mdr->client_request->get_op() != CEPH_MDS_OP_UNLINK)) {
respond_to_request(mdr, -CEPHFS_EROFS);
return nullptr;
}
if (!ref)
return;
- mdr->getattr_caps = mask;
-
/*
* if client currently holds the EXCL cap on a field, do not rdlock
* it; client's stat() will result in valid info if _either_ EXCL
// value for them. (currently this matters for xattrs and inline data)
mdr->getattr_caps = mask;
- mds->balancer->hit_inode(ref, META_POP_IRD, req->get_source().num());
+ mds->balancer->hit_inode(ref, META_POP_IRD);
// reply
dout(10) << "reply to stat on " << *req << dendl;
if (cmode & CEPH_FILE_MODE_WR)
mds->balancer->hit_inode(cur, META_POP_IWR);
else
- mds->balancer->hit_inode(cur, META_POP_IRD,
- mdr->client_request->get_source().num());
+ mds->balancer->hit_inode(cur, META_POP_IRD);
CDentry *dn = 0;
if (req->get_dentry_wanted()) {
void finish(int r) override {
ceph_assert(r == 0);
+ // crash current MDS and the replacing MDS will test the journal
+ ceph_assert(!g_conf()->mds_kill_skip_replaying_inotable);
+
dn->pop_projected_linkage();
// dirty inode, dn, dir
}
bool excl = req->head.args.open.flags & CEPH_O_EXCL;
- CDentry *dn = rdlock_path_xlock_dentry(mdr, true, !excl, true);
+ CDentry *dn = rdlock_path_xlock_dentry(mdr, true, !excl, true, true);
if (!dn)
return;
CDentry::linkage_t *dnl = dn->get_projected_linkage();
if (!excl && !dnl->is_null()) {
// it existed.
- mds->locker->xlock_downgrade(&dn->lock, mdr.get());
+ ceph_assert(mdr.get()->is_rdlocked(&dn->lock));
MutationImpl::LockOpVec lov;
lov.add_rdlock(&dnl->get_inode()->snaplock);
// this isn't perfect, but we should capture the main variable/unbounded size items!
int front_bytes = dirbl.length() + sizeof(__u32) + sizeof(__u8)*2;
int bytes_left = max_bytes - front_bytes;
- bytes_left -= realm->get_snap_trace().length();
+ bytes_left -= get_snap_trace(session, realm).length();
// build dir contents
bufferlist dnbl;
bool dnp = dn->use_projected(client, mdr);
CDentry::linkage_t *dnl = dnp ? dn->get_projected_linkage() : dn->get_linkage();
- if (dnl->is_null())
+ if (dnl->is_null()) {
+ if (dn->get_num_ref() == 0 && !dn->is_projected())
+ dir->remove_dentry(dn);
continue;
+ }
if (dn->last < snapid || dn->first > snapid) {
dout(20) << "skipping non-overlapping snap " << *dn << dendl;
mdr->reply_extra_bl = dirbl;
// bump popularity. NOTE: this doesn't quite capture it.
- mds->balancer->hit_dir(dir, META_POP_READDIR, -1, numfiles);
+ mds->balancer->hit_dir(dir, META_POP_READDIR, numfiles);
// reply
mdr->tracei = diri;
__u32 mask = req->head.args.setattr.mask;
__u32 access_mask = MAY_WRITE;
+ if (req->get_header().version < 6) {
+ // No changes to fscrypted inodes by downrevved clients
+ if (!cur->get_inode()->fscrypt_auth.empty()) {
+ respond_to_request(mdr, -CEPHFS_EPERM);
+ return;
+ }
+
+ // Only allow fscrypt field changes by capable clients
+ if (mask & (CEPH_SETATTR_FSCRYPT_FILE|CEPH_SETATTR_FSCRYPT_AUTH)) {
+ respond_to_request(mdr, -CEPHFS_EINVAL);
+ return;
+ }
+ }
+
// xlock inode
- if (mask & (CEPH_SETATTR_MODE|CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_BTIME|CEPH_SETATTR_KILL_SGUID))
+ if (mask & (CEPH_SETATTR_MODE|CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_BTIME|CEPH_SETATTR_KILL_SGUID|CEPH_SETATTR_FSCRYPT_AUTH|CEPH_SETATTR_KILL_SUID|CEPH_SETATTR_KILL_SGID))
lov.add_xlock(&cur->authlock);
- if (mask & (CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME|CEPH_SETATTR_SIZE))
+ if (mask & (CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME|CEPH_SETATTR_SIZE|CEPH_SETATTR_FSCRYPT_FILE))
lov.add_xlock(&cur->filelock);
if (mask & CEPH_SETATTR_CTIME)
lov.add_wrlock(&cur->versionlock);
bool truncating_smaller = false;
if (mask & CEPH_SETATTR_SIZE) {
- truncating_smaller = req->head.args.setattr.size < old_size;
+ if (req->get_data().length() >
+ sizeof(struct ceph_fscrypt_last_block_header) + fscrypt_last_block_max_size) {
+ dout(10) << __func__ << ": the last block size is too large" << dendl;
+ respond_to_request(mdr, -CEPHFS_EINVAL);
+ return;
+ }
+
+ truncating_smaller = req->head.args.setattr.size < old_size ||
+ (req->head.args.setattr.size == old_size && req->get_data().length());
if (truncating_smaller && pip->is_truncating()) {
dout(10) << " waiting for pending truncate from " << pip->truncate_from
<< " to " << pip->truncate_size << " to complete on " << *cur << dendl;
cur->add_waiter(CInode::WAIT_TRUNC, new C_MDS_RetryRequest(mdcache, mdr));
return;
}
+
+ if (truncating_smaller && req->get_data().length()) {
+ struct ceph_fscrypt_last_block_header header;
+ memset(&header, 0, sizeof(header));
+ auto bl = req->get_data().cbegin();
+ DECODE_START(1, bl);
+ decode(header.change_attr, bl);
+ DECODE_FINISH(bl);
+
+ dout(20) << __func__ << " mdr->retry:" << mdr->retry
+ << " header.change_attr: " << header.change_attr
+ << " header.file_offset: " << header.file_offset
+ << " header.block_size: " << header.block_size
+ << dendl;
+
+ if (header.change_attr != pip->change_attr) {
+ dout(5) << __func__ << ": header.change_attr:" << header.change_attr
+ << " != current change_attr:" << pip->change_attr
+ << ", let client retry it!" << dendl;
+ // flush the journal to make sure the clients will get the lasted
+ // change_attr as possible for the next retry
+ mds->mdlog->flush();
+ respond_to_request(mdr, -CEPHFS_EAGAIN);
+ return;
+ }
+ }
}
bool changed_ranges = false;
if (mask & CEPH_SETATTR_MODE)
pi.inode->mode = (pi.inode->mode & ~07777) | (req->head.args.setattr.mode & 07777);
- else if ((mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID)) &&
- S_ISREG(pi.inode->mode) &&
- (pi.inode->mode & (S_IXUSR|S_IXGRP|S_IXOTH))) {
- pi.inode->mode &= ~(S_ISUID|S_ISGID);
+ else if ((mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID|
+ CEPH_SETATTR_KILL_SUID|CEPH_SETATTR_KILL_SGID)) &&
+ S_ISREG(pi.inode->mode)) {
+ if (mask & (CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_KILL_SGUID) &&
+ (pi.inode->mode & (S_IXUSR|S_IXGRP|S_IXOTH))) {
+ pi.inode->mode &= ~(S_ISUID|S_ISGID);
+ } else {
+ if (mask & CEPH_SETATTR_KILL_SUID) {
+ pi.inode->mode &= ~S_ISUID;
+ }
+ if (mask & CEPH_SETATTR_KILL_SGID) {
+ pi.inode->mode &= ~S_ISGID;
+ }
+ }
}
if (mask & CEPH_SETATTR_MTIME)
pi.inode->time_warp_seq++; // maybe not a timewarp, but still a serialization point.
if (mask & CEPH_SETATTR_SIZE) {
if (truncating_smaller) {
- pi.inode->truncate(old_size, req->head.args.setattr.size);
+ pi.inode->truncate(old_size, req->head.args.setattr.size, req->get_data());
le->metablob.add_truncate_start(cur->ino());
} else {
pi.inode->size = req->head.args.setattr.size;
}
}
+ if (mask & CEPH_SETATTR_FSCRYPT_AUTH)
+ pi.inode->fscrypt_auth = req->fscrypt_auth;
+ if (mask & CEPH_SETATTR_FSCRYPT_FILE)
+ pi.inode->fscrypt_file = req->fscrypt_file;
+
pi.inode->version = cur->pre_dirty();
pi.inode->ctime = mdr->get_op_stamp();
if (mdr->get_op_stamp() > pi.inode->rstat.rctime)
void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur)
{
const cref_t<MClientRequest> &req = mdr->client_request;
+ MutationImpl::LockOpVec lov;
string name(req->get_path2());
bufferlist bl = req->get_data();
string value (bl.c_str(), bl.length());
if (!xlock_policylock(mdr, cur, true))
return;
+ /* We need 'As' caps for the fscrypt context */
+ lov.add_xlock(&cur->authlock);
+ if (!mds->locker->acquire_locks(mdr, lov)) {
+ return;
+ }
+
+ /* encrypted directories can't have their layout changed */
+ if (!cur->get_inode()->fscrypt_auth.empty()) {
+ respond_to_request(mdr, -CEPHFS_EINVAL);
+ return;
+ }
+
file_layout_t layout;
if (cur->get_projected_inode()->has_layout())
layout = cur->get_projected_inode()->layout;
if (check_layout_vxattr(mdr, rest, value, &layout) < 0)
return;
- MutationImpl::LockOpVec lov;
lov.add_xlock(&cur->filelock);
if (!mds->locker->acquire_locks(mdr, lov))
return;
+ /* encrypted files can't have their layout changed */
+ if (!cur->get_inode()->fscrypt_auth.empty()) {
+ respond_to_request(mdr, -CEPHFS_EINVAL);
+ return;
+ }
+
auto pi = cur->project_inode(mdr);
int64_t old_pool = pi.inode->layout.pool_id;
pi.inode->add_old_pool(old_pool);
return;
}
- if (quota.is_enable() && !cur->get_projected_srnode())
+ if (quota.is_enabled() && !cur->get_projected_srnode())
adjust_realm = true;
if (!xlock_policylock(mdr, cur, false, adjust_realm))
*/
if (!mdr->more()->rdonly_checks) {
if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
- MutationImpl::LockOpVec lov;
lov.add_rdlock(&cur->snaplock);
if (!mds->locker->acquire_locks(mdr, lov))
return;
pi.inode->ctime = mdr->get_op_stamp();
if (mdr->get_op_stamp() > pi.inode->rstat.rctime)
pi.inode->rstat.rctime = mdr->get_op_stamp();
- if (name == "encryption.ctx"sv)
- pi.inode->fscrypt = true;
pi.inode->change_attr++;
pi.inode->xattr_version++;
dn->add_waiter(CDentry::WAIT_UNLINK_FINISH, new C_WaitUnlinkToFinish(mdcache, dn, fin));
}
+struct C_WaitReintegrateToFinish : public MDSContext {
+protected:
+ MDCache *mdcache;
+ CDentry *dn;
+ MDSContext *fin;
+
+ MDSRank *get_mds() override
+ {
+ ceph_assert(mdcache != NULL);
+ return mdcache->mds;
+ }
+
+public:
+ C_WaitReintegrateToFinish(MDCache *m, CDentry *d, MDSContext *f) :
+ mdcache(m), dn(d), fin(f) {}
+ void finish(int r) override {
+ fin->complete(r);
+ dn->put(CDentry::PIN_PURGING);
+ }
+};
+
+bool Server::is_reintegrate_pending(CDentry *dn)
+{
+ CDentry::linkage_t *dnl = dn->get_projected_linkage();
+ if (!dnl->is_null() && dn->state_test(CDentry::STATE_REINTEGRATING)) {
+ return true;
+ }
+ return false;
+}
+
+void Server::wait_for_pending_reintegrate(CDentry *dn, MDRequestRef& mdr)
+{
+ dout(20) << __func__ << " dn " << *dn << dendl;
+ mds->locker->drop_locks(mdr.get());
+ auto fin = new C_MDS_RetryRequest(mdcache, mdr);
+ dn->get(CDentry::PIN_PURGING);
+ dn->add_waiter(CDentry::WAIT_REINTEGRATE_FINISH, new C_WaitReintegrateToFinish(mdcache, dn, fin));
+}
+
// MKNOD
class C_MDS_mknod_finish : public ServerLogContext {
void finish(int r) override {
ceph_assert(r == 0);
+ // crash current MDS and the replacing MDS will test the journal
+ ceph_assert(!g_conf()->mds_kill_skip_replaying_inotable);
+
// link the inode
dn->pop_projected_linkage();
mode |= S_IFREG;
mdr->disable_lock_cache();
- CDentry *dn = rdlock_path_xlock_dentry(mdr, true, false, S_ISREG(mode));
+ CDentry *dn = rdlock_path_xlock_dentry(mdr, true, false, false, S_ISREG(mode));
if (!dn)
return;
journal_and_reply(mdr, newi, dn, le, new C_MDS_mknod_finish(this, mdr, dn, newi));
mds->balancer->maybe_fragment(dir, false);
+
+ // flush the journal as soon as possible
+ if (g_conf()->mds_kill_skip_replaying_inotable) {
+ mdlog->flush();
+ }
}
if (target_pin != dir->inode &&
target_realm->get_subvolume_ino() !=
dir->inode->find_snaprealm()->get_subvolume_ino()) {
+ if (target_pin->is_stray()) {
+ mds->locker->drop_locks(mdr.get());
+ targeti->add_waiter(CInode::WAIT_UNLINK,
+ new C_MDS_RetryRequest(mdcache, mdr));
+ mdlog->flush();
+ return;
+ }
dout(7) << "target is in different subvolume, failing..." << dendl;
respond_to_request(mdr, -CEPHFS_EXDEV);
return;
if (!dn)
return;
+ if (is_reintegrate_pending(dn)) {
+ wait_for_pending_reintegrate(dn, mdr);
+ return;
+ }
+
// notify replica MDSes the dentry is under unlink
if (!dn->state_test(CDentry::STATE_UNLINKING)) {
dn->state_set(CDentry::STATE_UNLINKING);
C_MDS_rename_finish *fin = new C_MDS_rename_finish(this, mdr, srcdn, destdn, straydn);
journal_and_reply(mdr, srci, destdn, le, fin);
+
+ // trigger to flush mdlog in case reintegrating or migrating the stray dn,
+ // because the link requests maybe waiting.
+ if (srcdn->get_dir()->inode->is_stray()) {
+ mdlog->flush();
+ }
mds->balancer->maybe_fragment(destdn->get_dir(), false);
}
mdcache->journal_cow_dentry(mdr.get(), metablob, destdn, CEPH_NOSNAP, 0, destdnl);
destdn->first = mdcache->get_global_snaprealm()->get_newest_seq() + 1;
+ {
+ auto do_corruption = inject_rename_corrupt_dentry_first;
+ if (unlikely(do_corruption > 0.0)) {
+ auto r = ceph::util::generate_random_number(0.0, 1.0);
+ if (r < do_corruption) {
+ dout(0) << "corrupting dn: " << *destdn << dendl;
+ destdn->first = -10;
+ }
+ }
+ }
if (destdn->is_auth())
metablob->add_primary_dentry(destdn, srci, true, true);
srcdn->get_dir()->unlink_inode(srcdn);
+ // After the stray dn being unlinked from the corresponding inode in case of
+ // reintegrate_stray/migrate_stray, just wake up the waitiers.
+ MDSContext::vec finished;
+ in->take_waiting(CInode::WAIT_UNLINK, finished);
+ if (!finished.empty()) {
+ mds->queue_waiters(finished);
+ }
+
// dest
if (srcdn_was_remote) {
if (!linkmerge) {
return;
}
if (snapname.length() == 0 ||
+ snapname.length() > snapshot_name_max ||
snapname[0] == '_') {
respond_to_request(mdr, -CEPHFS_EINVAL);
return;
em.first->second = info;
newsnap.seq = snapid;
newsnap.last_created = snapid;
+ newsnap.last_modified = info.stamp;
+ newsnap.change_attr++;
// journal the inode changes
mdr->ls = mdlog->get_current_segment();
}
snapid_t snapid = diri->snaprealm->resolve_snapname(snapname, diri->ino());
dout(10) << " snapname " << snapname << " is " << snapid << dendl;
-
if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
MutationImpl::LockOpVec lov;
lov.add_xlock(&diri->snaplock);
newnode.snaps.erase(snapid);
newnode.seq = seq;
newnode.last_destroyed = seq;
+ newnode.last_modified = mdr->get_op_stamp();
+ newnode.change_attr++;
le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
le->metablob.add_table_transaction(TABLE_SNAP, stid);
{
dout(10) << "_rmsnap_finish " << *mdr << " " << snapid << dendl;
snapid_t stid = mdr->more()->stid;
- auto p = mdr->more()->snapidbl.cbegin();
- snapid_t seq;
- decode(seq, p);
mdr->apply();
// yay
mdr->in[0] = diri;
+ mdr->tracei = diri;
+ mdr->snapid = snapid;
respond_to_request(mdr, 0);
// purge snapshot data
auto it = newsnap.snaps.find(snapid);
ceph_assert(it != newsnap.snaps.end());
it->second.name = dstname;
+ newsnap.last_modified = mdr->get_op_stamp();
+ newsnap.change_attr++;
// journal the inode changes
mdr->ls = mdlog->get_current_segment();
f->dump_stream("client_reconnect_gather") << client_reconnect_gather;
f->close_section();
}
+
+const bufferlist& Server::get_snap_trace(Session *session, SnapRealm *realm) const {
+ ceph_assert(session);
+ ceph_assert(realm);
+ if (session->info.has_feature(CEPHFS_FEATURE_NEW_SNAPREALM_INFO)) {
+ return realm->get_snap_trace_new();
+ } else {
+ return realm->get_snap_trace();
+ }
+}
+
+const bufferlist& Server::get_snap_trace(client_t client, SnapRealm *realm) const {
+ Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(client.v));
+ return get_snap_trace(session, realm);
+}