#include "Mutation.h"
#include "MetricsHandler.h"
#include "cephfs_features.h"
-#include "MDSContext.h"
#include "msg/Messenger.h"
}
void _forward(mds_rank_t t) override {
MDCache* mdcache = server->mdcache;
- mdcache->mds->forward_message_mds(mdr->release_client_request(), t);
+ mdcache->mds->forward_message_mds(mdr, t);
mdr->set_mds_stamp(ceph_clock_now());
for (auto& m : batch_reqs) {
if (!m->killed)
batch_reqs.clear();
server->reply_client_request(mdr, make_message<MClientReply>(*mdr->client_request, r));
}
- void print(std::ostream& o) {
+ void print(std::ostream& o) const override {
o << "[batch front=" << *mdr << "]";
}
};
"Request type remove snapshot latency");
plb.add_time_avg(l_mdss_req_renamesnap_latency, "req_renamesnap_latency",
"Request type rename snapshot latency");
+ plb.add_time_avg(l_mdss_req_snapdiff_latency, "req_snapdiff_latency",
+ "Request type snapshot difference latency");
plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
plb.add_u64_counter(l_mdss_dispatch_client_request, "dispatch_client_request",
case CEPH_MSG_CLIENT_REQUEST:
handle_client_request(ref_cast<MClientRequest>(m));
return;
- case CEPH_MSG_CLIENT_REPLY:
- handle_client_reply(ref_cast<MClientReply>(m));
- return;
case CEPH_MSG_CLIENT_RECLAIM:
handle_client_reclaim(ref_cast<MClientReclaim>(m));
return;
mdr->pin(dn);
early_reply(mdr, in, dn);
-
+
mdr->committing = true;
submit_mdlog_entry(le, fin, mdr, __func__);
-
+
if (mdr->client_request && mdr->client_request->is_queued_for_replay()) {
if (mds->queue_one_replay()) {
dout(10) << " queued next replay op" << dendl;
} else {
dout(10) << " journaled last replay op" << dendl;
}
- } else if (mdr->did_early_reply) {
+ } else if (mdr->did_early_reply)
mds->locker->drop_rdlocks_for_early_reply(mdr.get());
- if (dn && dn->is_waiter_for(CDentry::WAIT_UNLINK_FINISH))
- mdlog->flush();
- } else {
+ else
mdlog->flush();
- }
}
void Server::submit_mdlog_entry(LogEvent *le, MDSLogContextBase *fin, MDRequestRef& mdr,
case CEPH_MDS_OP_RENAMESNAP:
code = l_mdss_req_renamesnap_latency;
break;
+ case CEPH_MDS_OP_READDIR_SNAPDIFF:
+ code = l_mdss_req_snapdiff_latency;
+ break;
default:
dout(1) << ": unknown client op" << dendl;
return;
mds->send_message_client(reply, session);
}
- if (client_inst.name.is_mds() && reply->get_op() == CEPH_MDS_OP_RENAME) {
- mds->send_message(reply, mdr->client_request->get_connection());
- }
-
if (req->is_queued_for_replay() &&
(mdr->has_completed || reply->get_result() < 0)) {
if (reply->get_result() < 0) {
// inode
if (in) {
in->encode_inodestat(bl, session, NULL, snapid, 0, mdr->getattr_caps);
- dout(20) << "set_trace_dist added in " << *in << dendl;
+ dout(20) << "set_trace_dist added snap " << snapid << " in " << *in
+ << dendl;
reply->head.is_target = 1;
} else
reply->head.is_target = 0;
return;
}
-void Server::handle_client_reply(const cref_t<MClientReply> &reply)
-{
- dout(4) << "handle_client_reply " << *reply << dendl;
-
- ceph_assert(reply->is_safe());
- ceph_tid_t tid = reply->get_tid();
-
- if (mds->internal_client_requests.count(tid) == 0) {
- dout(1) << " no pending request on tid " << tid << dendl;
- return;
- }
-
- auto &req = mds->internal_client_requests.at(tid);
- CDentry *dn = req.get_dentry();
-
- switch (reply->get_op()) {
- case CEPH_MDS_OP_RENAME:
- if (dn) {
- dn->state_clear(CDentry::STATE_REINTEGRATING);
-
- MDSContext::vec finished;
- dn->take_waiting(CDentry::WAIT_REINTEGRATE_FINISH, finished);
- mds->queue_waiters(finished);
- }
- break;
- default:
- dout(5) << " unknown client op " << reply->get_op() << dendl;
- }
-
- mds->internal_client_requests.erase(tid);
-}
-
void Server::handle_osd_map()
{
/* Note that we check the OSDMAP_FULL flag directly rather than
case CEPH_MDS_OP_RENAMESNAP:
handle_client_renamesnap(mdr);
break;
+ case CEPH_MDS_OP_READDIR_SNAPDIFF:
+ handle_client_readdir_snapdiff(mdr);
+ break;
default:
dout(1) << " unknown client op " << req->get_op() << dendl;
_inode->mode |= S_ISGID;
}
} else {
- _inode->gid = mdr->client_request->get_caller_gid();
+ _inode->gid = mdr->client_request->get_owner_gid();
+ ceph_assert(_inode->gid != (unsigned)-1);
}
- _inode->uid = mdr->client_request->get_caller_uid();
+ _inode->uid = mdr->client_request->get_owner_uid();
+ ceph_assert(_inode->uid != (unsigned)-1);
_inode->btime = _inode->ctime = _inode->mtime = _inode->atime =
mdr->get_op_stamp();
} else {
dout(20) << __func__ << ": LOOKUP op, wait for previous same getattr ops to respond. " << *mdr << dendl;
em.first->second->add_request(mdr);
+ mdr->mark_event("joining batch lookup");
return;
}
} else {
} else {
dout(20) << __func__ << ": GETATTR op, wait for previous same getattr ops to respond. " << *mdr << dendl;
em.first->second->add_request(mdr);
+ mdr->mark_event("joining batch getattr");
return;
}
}
} else if (ref->filelock.is_stable() ||
ref->filelock.get_num_wrlocks() > 0 ||
!ref->filelock.can_read(mdr->get_client())) {
+ /* Since we're taking advantage of an optimization here:
+ *
+ * We cannot suddenly, due to a changing condition, add this filelock as
+ * it can cause lock-order deadlocks. In this case, that condition is the
+ * lock state changes between request retries. If that happens, we need
+ * to check if we've acquired the other locks in this vector. If we have,
+ * then we need to drop those locks and retry.
+ */
+ if (mdr->is_rdlocked(&ref->linklock) ||
+ mdr->is_rdlocked(&ref->authlock) ||
+ mdr->is_rdlocked(&ref->xattrlock)) {
+ /* start over */
+ dout(20) << " dropping locks and restarting request because filelock state change" << dendl;
+ mds->locker->drop_locks(mdr.get());
+ mdr->drop_local_auth_pins();
+ mds->queue_waiter(new C_MDS_RetryRequest(mdcache, mdr));
+ return;
+ }
lov.add_rdlock(&ref->filelock);
mdr->locking_state &= ~MutationImpl::ALL_LOCKED;
}
}
MutationImpl::LockOpVec lov;
+ lov.add_rdlock(&cur->snaplock);
unsigned mask = req->head.args.open.mask;
if (mask) {
if (!dn)
return;
- if (is_unlink_pending(dn)) {
- wait_for_pending_unlink(dn, mdr);
- return;
- }
-
CDentry::linkage_t *dnl = dn->get_projected_linkage();
if (!excl && !dnl->is_null()) {
// it existed.
ceph_assert(mdr.get()->is_rdlocked(&dn->lock));
- MutationImpl::LockOpVec lov;
- lov.add_rdlock(&dnl->get_inode()->snaplock);
- if (!mds->locker->acquire_locks(mdr, lov))
- return;
-
handle_client_open(mdr);
return;
}
}
+void Server::_finalize_readdir(MDRequestRef& mdr,
+ CInode *diri,
+ CDir* dir,
+ bool start,
+ bool end,
+ __u16 flags,
+ __u32 numfiles,
+ bufferlist& dirbl,
+ bufferlist& dnbl)
+{
+ const cref_t<MClientRequest> &req = mdr->client_request;
+ Session *session = mds->get_session(req);
+
+ session->touch_readdir_cap(numfiles);
+
+ if (end) {
+ flags |= CEPH_READDIR_FRAG_END;
+ if (start)
+ flags |= CEPH_READDIR_FRAG_COMPLETE; // FIXME: what purpose does this serve
+ }
+
+ // finish final blob
+ encode(numfiles, dirbl);
+ encode(flags, dirbl);
+ dirbl.claim_append(dnbl);
+
+ // yay, reply
+ dout(10) << "reply to " << *req << " readdir num=" << numfiles
+ << " bytes=" << dirbl.length()
+ << " start=" << (int)start
+ << " end=" << (int)end
+ << dendl;
+ mdr->reply_extra_bl = dirbl;
+
+ // bump popularity. NOTE: this doesn't quite capture it.
+ mds->balancer->hit_dir(dir, META_POP_READDIR, numfiles);
+
+ // reply
+ mdr->tracei = diri;
+ respond_to_request(mdr, 0);
+}
void Server::handle_client_readdir(MDRequestRef& mdr)
{
if (logger)
logger->inc(l_mdss_cap_acquisition_throttle);
+ mdr->mark_event("cap_acquisition_throttle");
mds->timer.add_event_after(caps_throttle_retry_request_timeout, new C_MDS_RetryRequest(mdcache, mdr));
return;
}
dout(10) << " ran out of room, stopping at " << dnbl.length() << " < " << bytes_left << dendl;
break;
}
-
+
unsigned start_len = dnbl.length();
// dentry
mds->locker->issue_client_lease(dn, in, mdr, now, dnbl);
// inode
- dout(12) << "including inode " << *in << dendl;
+ dout(12) << "including inode in " << *in << " snap " << snapid << dendl;
int r = in->encode_inodestat(dnbl, mdr->session, realm, snapid, bytes_left - (int)dnbl.length());
if (r < 0) {
// chop off dn->name, lease
// touch dn
mdcache->lru.lru_touch(dn);
}
-
- session->touch_readdir_cap(numfiles);
-
__u16 flags = 0;
- if (end) {
- flags = CEPH_READDIR_FRAG_END;
- if (start)
- flags |= CEPH_READDIR_FRAG_COMPLETE; // FIXME: what purpose does this serve
- }
// client only understand END and COMPLETE flags ?
if (req_flags & CEPH_READDIR_REPLY_BITFLAGS) {
flags |= CEPH_READDIR_HASH_ORDER | CEPH_READDIR_OFFSET_HASH;
}
-
- // finish final blob
- encode(numfiles, dirbl);
- encode(flags, dirbl);
- dirbl.claim_append(dnbl);
-
- // yay, reply
- dout(10) << "reply to " << *req << " readdir num=" << numfiles
- << " bytes=" << dirbl.length()
- << " start=" << (int)start
- << " end=" << (int)end
- << dendl;
- mdr->reply_extra_bl = dirbl;
-
- // bump popularity. NOTE: this doesn't quite capture it.
- mds->balancer->hit_dir(dir, META_POP_READDIR, numfiles);
-
- // reply
- mdr->tracei = diri;
- respond_to_request(mdr, 0);
+ _finalize_readdir(mdr, diri, dir, start, end, flags, numfiles, dirbl, dnbl);
}
// ------------------------------------------------
-struct C_WaitUnlinkToFinish : public MDSContext {
-protected:
- MDCache *mdcache;
- CDentry *dn;
- MDSContext *fin;
-
- MDSRank *get_mds() override
- {
- ceph_assert(mdcache != NULL);
- return mdcache->mds;
- }
-
-public:
- C_WaitUnlinkToFinish(MDCache *m, CDentry *d, MDSContext *f) :
- mdcache(m), dn(d), fin(f) {}
- void finish(int r) override {
- fin->complete(r);
- dn->put(CDentry::PIN_PURGING);
- }
-};
-
-bool Server::is_unlink_pending(CDentry *dn)
-{
- CDentry::linkage_t *dnl = dn->get_projected_linkage();
- if (!dnl->is_null() && dn->state_test(CDentry::STATE_UNLINKING)) {
- return true;
- }
- return false;
-}
-
-void Server::wait_for_pending_unlink(CDentry *dn, MDRequestRef& mdr)
-{
- dout(20) << __func__ << " dn " << *dn << dendl;
- mds->locker->drop_locks(mdr.get());
- auto fin = new C_MDS_RetryRequest(mdcache, mdr);
- dn->get(CDentry::PIN_PURGING);
- dn->add_waiter(CDentry::WAIT_UNLINK_FINISH, new C_WaitUnlinkToFinish(mdcache, dn, fin));
-}
-
-struct C_WaitReintegrateToFinish : public MDSContext {
-protected:
- MDCache *mdcache;
- CDentry *dn;
- MDSContext *fin;
-
- MDSRank *get_mds() override
- {
- ceph_assert(mdcache != NULL);
- return mdcache->mds;
- }
-
-public:
- C_WaitReintegrateToFinish(MDCache *m, CDentry *d, MDSContext *f) :
- mdcache(m), dn(d), fin(f) {}
- void finish(int r) override {
- fin->complete(r);
- dn->put(CDentry::PIN_PURGING);
- }
-};
-
-bool Server::is_reintegrate_pending(CDentry *dn)
-{
- CDentry::linkage_t *dnl = dn->get_projected_linkage();
- if (!dnl->is_null() && dn->state_test(CDentry::STATE_REINTEGRATING)) {
- return true;
- }
- return false;
-}
-
-void Server::wait_for_pending_reintegrate(CDentry *dn, MDRequestRef& mdr)
-{
- dout(20) << __func__ << " dn " << *dn << dendl;
- mds->locker->drop_locks(mdr.get());
- auto fin = new C_MDS_RetryRequest(mdcache, mdr);
- dn->get(CDentry::PIN_PURGING);
- dn->add_waiter(CDentry::WAIT_REINTEGRATE_FINISH, new C_WaitReintegrateToFinish(mdcache, dn, fin));
-}
-
// MKNOD
class C_MDS_mknod_finish : public ServerLogContext {
if (!dn)
return;
- if (is_unlink_pending(dn)) {
- wait_for_pending_unlink(dn, mdr);
- return;
- }
-
CDir *dir = dn->get_dir();
CInode *diri = dir->get_inode();
if (!check_access(mdr, diri, MAY_WRITE))
if (!dn)
return;
- if (is_unlink_pending(dn)) {
- wait_for_pending_unlink(dn, mdr);
- return;
- }
-
CDir *dir = dn->get_dir();
CInode *diri = dir->get_inode();
if (!dn)
return;
- if (is_unlink_pending(dn)) {
- wait_for_pending_unlink(dn, mdr);
- return;
- }
-
CDir *dir = dn->get_dir();
CInode *diri = dir->get_inode();
targeti = ret.second->get_projected_linkage()->get_inode();
}
- if (is_unlink_pending(destdn)) {
- wait_for_pending_unlink(destdn, mdr);
- return;
- }
-
ceph_assert(destdn->get_projected_linkage()->is_null());
if (req->get_alternate_name().size() > alternate_name_max) {
dout(10) << " alternate_name longer than " << alternate_name_max << dendl;
SnapRealm *target_realm = target_pin->find_snaprealm();
if (target_pin != dir->inode &&
target_realm->get_subvolume_ino() !=
- dir->inode->find_snaprealm()->get_subvolume_ino()) {
- if (target_pin->is_stray()) {
- mds->locker->drop_locks(mdr.get());
- targeti->add_waiter(CInode::WAIT_UNLINK,
- new C_MDS_RetryRequest(mdcache, mdr));
- mdlog->flush();
- return;
- }
+ dir->inode->find_snaprealm()->get_subvolume_ino() &&
+ /* The inode is temporarily located in the stray dir pending reintegration */
+ !target_pin->is_stray()) {
dout(7) << "target is in different subvolume, failing..." << dendl;
respond_to_request(mdr, -CEPHFS_EXDEV);
return;
mdr->apply();
MDRequestRef null_ref;
- if (inc) {
+ if (inc)
mdcache->send_dentry_link(dn, null_ref);
- } else {
- dn->state_clear(CDentry::STATE_UNLINKING);
+ else
mdcache->send_dentry_unlink(dn, NULL, null_ref);
-
- MDSContext::vec finished;
- dn->take_waiting(CDentry::WAIT_UNLINK_FINISH, finished);
- mdcache->mds->queue_waiters(finished);
- }
-
+
// bump target popularity
mds->balancer->hit_inode(targeti, META_POP_IWR);
mds->balancer->hit_dir(dn->get_dir(), META_POP_IWR);
if (rmdir)
mdr->disable_lock_cache();
-
CDentry *dn = rdlock_path_xlock_dentry(mdr, false, true);
if (!dn)
return;
- if (is_reintegrate_pending(dn)) {
- wait_for_pending_reintegrate(dn, mdr);
- return;
- }
-
- // notify replica MDSes the dentry is under unlink
- if (!dn->state_test(CDentry::STATE_UNLINKING)) {
- dn->state_set(CDentry::STATE_UNLINKING);
- mdcache->send_dentry_unlink(dn, nullptr, mdr, true);
- if (dn->replica_unlinking_ref) {
- return;
- }
- }
-
CDentry::linkage_t *dnl = dn->get_linkage(client, mdr);
ceph_assert(!dnl->is_null());
CInode *in = dnl->get_inode();
if (rmdir) {
// do empty directory checks
if (_dir_is_nonempty_unlocked(mdr, in)) {
- dn->state_clear(CDentry::STATE_UNLINKING);
- respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
+ respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
return;
}
} else {
dout(7) << "handle_client_unlink on dir " << *in << ", returning error" << dendl;
- dn->state_clear(CDentry::STATE_UNLINKING);
respond_to_request(mdr, -CEPHFS_EISDIR);
return;
}
if (rmdir) {
// unlink
dout(7) << "handle_client_rmdir on non-dir " << *in << ", returning error" << dendl;
- dn->state_clear(CDentry::STATE_UNLINKING);
respond_to_request(mdr, -CEPHFS_ENOTDIR);
return;
}
CInode *diri = dn->get_dir()->get_inode();
if ((!mdr->has_more() || mdr->more()->witnessed.empty())) {
- if (!check_access(mdr, diri, MAY_WRITE)) {
- dn->state_clear(CDentry::STATE_UNLINKING);
+ if (!check_access(mdr, diri, MAY_WRITE))
return;
- }
}
// -- create stray dentry? --
if (in->is_dir() &&
_dir_is_nonempty(mdr, in)) {
respond_to_request(mdr, -CEPHFS_ENOTEMPTY);
- dn->state_clear(CDentry::STATE_UNLINKING);
return;
}
}
mdr->apply();
-
- dn->state_clear(CDentry::STATE_UNLINKING);
+
mdcache->send_dentry_unlink(dn, straydn, mdr);
-
- MDSContext::vec finished;
- dn->take_waiting(CDentry::WAIT_UNLINK_FINISH, finished);
- mdcache->mds->queue_waiters(finished);
-
+
if (straydn) {
// update subtree map?
if (strayin->is_dir())
// reply
respond_to_request(mdr, 0);
-
+
// removing a new dn?
dn->get_dir()->try_remove_unlinked_dn(dn);
if (!destdn)
return;
- if (is_unlink_pending(destdn)) {
- wait_for_pending_unlink(destdn, mdr);
- return;
- }
-
- if (is_unlink_pending(srcdn)) {
- wait_for_pending_unlink(srcdn, mdr);
- return;
- }
-
dout(10) << " destdn " << *destdn << dendl;
CDir *destdir = destdn->get_dir();
ceph_assert(destdir->is_auth());
C_MDS_rename_finish *fin = new C_MDS_rename_finish(this, mdr, srcdn, destdn, straydn);
journal_and_reply(mdr, srci, destdn, le, fin);
-
- // trigger to flush mdlog in case reintegrating or migrating the stray dn,
- // because the link requests maybe waiting.
- if (srcdn->get_dir()->inode->is_stray()) {
- mdlog->flush();
- }
mds->balancer->maybe_fragment(destdn->get_dir(), false);
}
// primary+remote link merge?
bool linkmerge = (srcdnl->get_inode() == oldin);
if (linkmerge)
- ceph_assert(srcdnl->is_primary() || destdnl->is_remote());
+ ceph_assert(srcdnl->is_primary() && destdnl->is_remote());
bool new_in_snaprealm = false;
bool new_oldin_snaprealm = false;
srcdn->get_dir()->unlink_inode(srcdn);
- // After the stray dn being unlinked from the corresponding inode in case of
- // reintegrate_stray/migrate_stray, just wake up the waitiers.
- MDSContext::vec finished;
- in->take_waiting(CInode::WAIT_UNLINK, finished);
- if (!finished.empty()) {
- mds->queue_waiters(finished);
- }
-
// dest
if (srcdn_was_remote) {
if (!linkmerge) {
return;
}
- snapid_t snapid = diri->snaprealm->resolve_snapname(srcname, diri->ino());
+ snapid_t snapid = diri->snaprealm->resolve_snapname(srcname, diri->ino());
+
dout(10) << " snapname " << srcname << " is " << snapid << dendl;
// lock snap
respond_to_request(mdr, 0);
}
+void Server::handle_client_readdir_snapdiff(MDRequestRef& mdr)
+{
+ const cref_t<MClientRequest>& req = mdr->client_request;
+ Session* session = mds->get_session(req);
+ MutationImpl::LockOpVec lov;
+ CInode* diri = rdlock_path_pin_ref(mdr, false, true);
+ if (!diri) return;
+
+ // it's a directory, right?
+ if (!diri->is_dir()) {
+ // not a dir
+ dout(10) << "reply to " << *req << " snapdiff -CEPHFS_ENOTDIR" << dendl;
+ respond_to_request(mdr, -CEPHFS_ENOTDIR);
+ return;
+ }
+
+ auto num_caps = session->get_num_caps();
+ auto session_cap_acquisition = session->get_cap_acquisition();
+
+ if (num_caps > static_cast<uint64_t>(max_caps_per_client * max_caps_throttle_ratio) && session_cap_acquisition >= cap_acquisition_throttle) {
+ dout(20) << "snapdiff throttled. max_caps_per_client: " << max_caps_per_client << " num_caps: " << num_caps
+ << " session_cap_acquistion: " << session_cap_acquisition << " cap_acquisition_throttle: " << cap_acquisition_throttle << dendl;
+ if (logger)
+ logger->inc(l_mdss_cap_acquisition_throttle);
+
+ mds->timer.add_event_after(caps_throttle_retry_request_timeout, new C_MDS_RetryRequest(mdcache, mdr));
+ return;
+ }
+
+ lov.add_rdlock(&diri->filelock);
+ lov.add_rdlock(&diri->dirfragtreelock);
+
+ if (!mds->locker->acquire_locks(mdr, lov))
+ return;
+
+ if (!check_access(mdr, diri, MAY_READ))
+ return;
+
+ // which frag?
+ frag_t fg = (__u32)req->head.args.snapdiff.frag;
+ unsigned req_flags = (__u32)req->head.args.snapdiff.flags;
+ string offset_str = req->get_path2();
+
+ __u32 offset_hash = 0;
+ if (!offset_str.empty()) {
+ offset_hash = ceph_frag_value(diri->hash_dentry_name(offset_str));
+ } else {
+ offset_hash = (__u32)req->head.args.snapdiff.offset_hash;
+ }
+
+ dout(10) << " frag " << fg << " offset '" << offset_str << "'"
+ << " offset_hash " << offset_hash << " flags " << req_flags << dendl;
+
+ // does the frag exist?
+ if (diri->dirfragtree[fg.value()] != fg) {
+ frag_t newfg;
+ if (req_flags & CEPH_READDIR_REPLY_BITFLAGS) {
+ if (fg.contains((unsigned)offset_hash)) {
+ newfg = diri->dirfragtree[offset_hash];
+ } else {
+ // client actually wants next frag
+ newfg = diri->dirfragtree[fg.value()];
+ }
+ } else {
+ offset_str.clear();
+ newfg = diri->dirfragtree[fg.value()];
+ }
+ dout(10) << " adjust frag " << fg << " -> " << newfg << " " << diri->dirfragtree << dendl;
+ fg = newfg;
+ }
+
+ CDir* dir = try_open_auth_dirfrag(diri, fg, mdr);
+ if (!dir) return;
+
+ // ok!
+ dout(10) << __func__<< " on " << *dir << dendl;
+ ceph_assert(dir->is_auth());
+
+ if (!dir->is_complete()) {
+ if (dir->is_frozen()) {
+ dout(7) << "dir is frozen " << *dir << dendl;
+ mds->locker->drop_locks(mdr.get());
+ mdr->drop_local_auth_pins();
+ dir->add_waiter(CDir::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
+ return;
+ }
+ // fetch
+ dout(10) << " incomplete dir contents for snapdiff on " << *dir << ", fetching" << dendl;
+ dir->fetch(new C_MDS_RetryRequest(mdcache, mdr), true);
+ return;
+ }
+
+#ifdef MDS_VERIFY_FRAGSTAT
+ dir->verify_fragstat();
+#endif
+
+ utime_t now = ceph_clock_now();
+ mdr->set_mds_stamp(now);
+
+ mdr->snapid_diff_other = (uint64_t)req->head.args.snapdiff.snap_other;
+ if (mdr->snapid_diff_other == mdr->snapid ||
+ mdr->snapid == CEPH_NOSNAP ||
+ mdr->snapid_diff_other == CEPH_NOSNAP) {
+ dout(10) << "reply to " << *req << " snapdiff -CEPHFS_EINVAL" << dendl;
+ respond_to_request(mdr, -CEPHFS_EINVAL);
+ }
+
+ dout(10) << __func__
+ << " snap " << mdr->snapid
+ << " vs. snap " << mdr->snapid_diff_other
+ << dendl;
+
+ SnapRealm* realm = diri->find_snaprealm();
+
+ unsigned max = req->head.args.snapdiff.max_entries;
+ if (!max)
+ max = dir->get_num_any(); // whatever, something big.
+ unsigned max_bytes = req->head.args.snapdiff.max_bytes;
+ if (!max_bytes)
+ // make sure at least one item can be encoded
+ max_bytes = (512 << 10) + g_conf()->mds_max_xattr_pairs_size;
+
+ // start final blob
+ bufferlist dirbl;
+ DirStat ds;
+ ds.frag = dir->get_frag();
+ ds.auth = dir->get_dir_auth().first;
+ if (dir->is_auth() && !forward_all_requests_to_auth)
+ dir->get_dist_spec(ds.dist, mds->get_nodeid());
+
+ dir->encode_dirstat(dirbl, mdr->session->info, ds);
+
+ // count bytes available.
+ // this isn't perfect, but we should capture the main variable/unbounded size items!
+ int front_bytes = dirbl.length() + sizeof(__u32) + sizeof(__u8) * 2;
+ int bytes_left = max_bytes - front_bytes;
+ bytes_left -= get_snap_trace(session, realm).length();
+
+ _readdir_diff(
+ now,
+ mdr,
+ diri,
+ dir,
+ realm,
+ max,
+ bytes_left,
+ offset_str,
+ offset_hash,
+ req_flags,
+ dirbl);
+}
+
+
/**
* Return true if server is in state RECONNECT and this
* client has not yet reconnected.
Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(client.v));
return get_snap_trace(session, realm);
}
+
+void Server::_readdir_diff(
+ utime_t now,
+ MDRequestRef& mdr,
+ CInode* diri,
+ CDir* dir,
+ SnapRealm* realm,
+ unsigned max_entries,
+ int bytes_left,
+ const string& offset_str,
+ uint32_t offset_hash,
+ unsigned req_flags,
+ bufferlist& dirbl)
+{
+ // build dir contents
+ bufferlist dnbl;
+ __u32 numfiles = 0;
+
+ snapid_t snapid = mdr->snapid;
+ snapid_t snapid_prev = mdr->snapid_diff_other;
+ if (snapid < snapid_prev) {
+ std::swap(snapid, snapid_prev);
+ }
+ bool from_the_beginning = !offset_hash && offset_str.empty();
+ // skip all dns < dentry_key_t(snapid, offset_str, offset_hash)
+ dentry_key_t skip_key(snapid_prev, offset_str.c_str(), offset_hash);
+
+ bool end = build_snap_diff(
+ mdr,
+ dir,
+ bytes_left,
+ from_the_beginning ? nullptr : & skip_key,
+ snapid_prev,
+ snapid,
+ dnbl,
+ [&](CDentry* dn, CInode* in, bool exists) {
+ string name;
+ snapid_t effective_snapid;
+ const auto& dn_name = dn->get_name();
+ // provide the first snapid for removed entries and
+ // the last one for existent ones
+ effective_snapid = exists ? snapid : snapid_prev;
+ name.append(dn_name);
+ if ((int)(dnbl.length() + name.length() + sizeof(__u32) + sizeof(LeaseStat)) > bytes_left) {
+ dout(10) << " ran out of room, stopping at " << dnbl.length() << " < " << bytes_left << dendl;
+ return false;
+ }
+
+ auto diri = dir->get_inode();
+ auto hash = ceph_frag_value(diri->hash_dentry_name(dn_name));
+ unsigned start_len = dnbl.length();
+ dout(10) << "inc dn " << *dn << " as " << name
+ << std::hex << " hash 0x" << hash << std::dec
+ << dendl;
+ encode(name, dnbl);
+ mds->locker->issue_client_lease(dn, in, mdr, now, dnbl);
+
+ // inode
+ dout(10) << "inc inode " << *in << " snap " << effective_snapid << dendl;
+ int r = in->encode_inodestat(dnbl, mdr->session, realm, effective_snapid, bytes_left - (int)dnbl.length());
+ if (r < 0) {
+ // chop off dn->name, lease
+ dout(10) << " ran out of room, stopping at "
+ << start_len << " < " << bytes_left << dendl;
+ bufferlist keep;
+ keep.substr_of(dnbl, 0, start_len);
+ dnbl.swap(keep);
+ return false;
+ }
+
+ // touch dn
+ mdcache->lru.lru_touch(dn);
+ ++numfiles;
+ return true;
+ });
+
+ __u16 flags = 0;
+ if (req_flags & CEPH_READDIR_REPLY_BITFLAGS) {
+ flags |= CEPH_READDIR_HASH_ORDER | CEPH_READDIR_OFFSET_HASH;
+ }
+
+ std::swap(mdr->snapid, mdr->snapid_diff_other); // we want opponent snapid to be used for tracei
+
+ _finalize_readdir(mdr, diri, dir, from_the_beginning, end, flags, numfiles,
+ dirbl, dnbl);
+}
+
+bool Server::build_snap_diff(
+ MDRequestRef& mdr,
+ CDir* dir,
+ int bytes_left,
+ dentry_key_t* skip_key,
+ snapid_t snapid_prev,
+ snapid_t snapid,
+ const bufferlist& dnbl,
+ std::function<bool (CDentry*, CInode*, bool)> add_result_cb)
+{
+ client_t client = mdr->client_request->get_source().num();
+
+ struct EntryInfo {
+ CDentry* dn = nullptr;
+ CInode* in = nullptr;
+ utime_t mtime;
+
+ void reset() {
+ *this = EntryInfo();
+ }
+ } before;
+
+ auto insert_deleted = [&](EntryInfo& ei) {
+ dout(20) << "build_snap_diff deleted file " << ei.dn->get_name() << " "
+ << ei.dn->first << "/" << ei.dn->last << dendl;
+ int r = add_result_cb(ei.dn, ei.in, false);
+ ei.reset();
+ return r;
+ };
+
+ auto it = !skip_key ? dir->begin() : dir->lower_bound(*skip_key);
+
+ while(it != dir->end()) {
+ CDentry* dn = it->second;
+ dout(20) << __func__ << " " << it->first << "->" << *dn << dendl;
+ ++it;
+ if (dn->state_test(CDentry::STATE_PURGING))
+ continue;
+
+ bool dnp = dn->use_projected(client, mdr);
+ CDentry::linkage_t* dnl = dnp ? dn->get_projected_linkage() : dn->get_linkage();
+
+ if (dnl->is_null()) {
+ dout(20) << __func__ << " linkage is null, skipping" << dendl;
+ continue;
+ }
+
+ if (dn->last < snapid_prev || dn->first > snapid) {
+ dout(20) << __func__ << " not in range, skipping" << dendl;
+ continue;
+ }
+ if (skip_key) {
+ skip_key->snapid = dn->last;
+ if (!(*skip_key < dn->key()))
+ continue;
+ }
+
+ CInode* in = dnl->get_inode();
+ if (in && in->ino() == CEPH_INO_CEPH)
+ continue;
+
+ // remote link?
+ // better for the MDS to do the work, if we think the client will stat any of these files.
+ if (dnl->is_remote() && !in) {
+ in = mdcache->get_inode(dnl->get_remote_ino());
+ dout(20) << __func__ << " remote in: " << *in << " ino " << std::hex << dnl->get_remote_ino() << std::dec << dendl;
+ if (in) {
+ dn->link_remote(dnl, in);
+ } else if (dn->state_test(CDentry::STATE_BADREMOTEINO)) {
+ dout(10) << "skipping bad remote ino on " << *dn << dendl;
+ continue;
+ } else {
+ // touch everything i _do_ have
+ for (auto& p : *dir) {
+ if (!p.second->get_linkage()->is_null())
+ mdcache->lru.lru_touch(p.second);
+ }
+
+ // already issued caps and leases, reply immediately.
+ if (dnbl.length() > 0) {
+ mdcache->open_remote_dentry(dn, dnp, new C_MDSInternalNoop);
+ dout(10) << " open remote dentry after caps were issued, stopping at "
+ << dnbl.length() << " < " << bytes_left << dendl;
+ } else {
+ mds->locker->drop_locks(mdr.get());
+ mdr->drop_local_auth_pins();
+ mdcache->open_remote_dentry(dn, dnp, new C_MDS_RetryRequest(mdcache, mdr));
+ }
+ return false;
+ }
+ }
+ ceph_assert(in);
+
+ utime_t mtime = in->get_inode()->mtime;
+
+ if (in->is_dir()) {
+
+ // we need to maintain the order of entries (determined by their name hashes)
+ // hence need to insert the previous entry if any immediately.
+ if (before.dn) {
+ if (!insert_deleted(before)) {
+ break;
+ }
+ }
+
+ bool exists = true;
+ if (snapid_prev < dn->first && dn->last < snapid) {
+ dout(20) << __func__ << " skipping inner " << dn->get_name() << " "
+ << dn->first << "/" << dn->last << dendl;
+ continue;
+ } else if (dn->first <= snapid_prev && dn->last < snapid) {
+ // dir deleted
+ dout(20) << __func__ << " deleted dir " << dn->get_name() << " "
+ << dn->first << "/" << dn->last << dendl;
+ exists = false;
+ }
+ bool r = add_result_cb(dn, in, exists);
+ if (!r) {
+ break;
+ }
+ } else {
+ if (snapid_prev >= dn->first && snapid <= dn->last) {
+ dout(20) << __func__ << " skipping unchanged " << dn->get_name() << " "
+ << dn->first << "/" << dn->last << dendl;
+ continue;
+ } else if (snapid_prev < dn->first && snapid > dn->last) {
+ dout(20) << __func__ << " skipping inner modification " << dn->get_name() << " "
+ << dn->first << "/" << dn->last << dendl;
+ continue;
+ }
+ string_view name_before =
+ before.dn ? string_view(before.dn->get_name()) : string_view();
+ if (before.dn && dn->get_name() != name_before) {
+ if (!insert_deleted(before)) {
+ break;
+ }
+ before.reset();
+ }
+ if (snapid_prev >= dn->first && snapid_prev <= dn->last) {
+ dout(30) << __func__ << " dn_before " << dn->get_name() << " "
+ << dn->first << "/" << dn->last << dendl;
+ before = EntryInfo {dn, in, mtime};
+ continue;
+ } else {
+ if (before.dn && dn->get_name() == name_before) {
+ if (mtime == before.mtime) {
+ dout(30) << __func__ << " timestamp not changed " << dn->get_name() << " "
+ << dn->first << "/" << dn->last
+ << " " << mtime
+ << dendl;
+ before.reset();
+ continue;
+ } else {
+ dout(30) << __func__ << " timestamp changed " << dn->get_name() << " "
+ << dn->first << "/" << dn->last
+ << " " << before.mtime << " vs. " << mtime
+ << dendl;
+ before.reset();
+ }
+ }
+ dout(20) << __func__ << " new file " << dn->get_name() << " "
+ << dn->first << "/" << dn->last
+ << dendl;
+ ceph_assert(snapid >= dn->first && snapid <= dn->last);
+ }
+ if (!add_result_cb(dn, in, true)) {
+ break;
+ }
+ }
+ }
+ if (before.dn) {
+ insert_deleted(before);
+ }
+ return it == dir->end();
+}