#include "Mutation.h"
#include "MetricsHandler.h"
#include "cephfs_features.h"
+#include "MDSContext.h"
#include "msg/Messenger.h"
case CEPH_MSG_CLIENT_REQUEST:
handle_client_request(ref_cast<MClientRequest>(m));
return;
+ case CEPH_MSG_CLIENT_REPLY:
+ handle_client_reply(ref_cast<MClientReply>(m));
+ return;
case CEPH_MSG_CLIENT_RECLAIM:
handle_client_reclaim(ref_cast<MClientReclaim>(m));
return;
mds->send_message_client(reply, session);
}
+ if (client_inst.name.is_mds() && reply->get_op() == CEPH_MDS_OP_RENAME) {
+ mds->send_message(reply, mdr->client_request->get_connection());
+ }
+
if (req->is_queued_for_replay() &&
(mdr->has_completed || reply->get_result() < 0)) {
if (reply->get_result() < 0) {
return;
}
+void Server::handle_client_reply(const cref_t<MClientReply> &reply)
+{
+ dout(4) << "handle_client_reply " << *reply << dendl;
+
+ ceph_assert(reply->is_safe());
+ ceph_tid_t tid = reply->get_tid();
+
+ if (mds->internal_client_requests.count(tid) == 0) {
+ dout(1) << " no pending request on tid " << tid << dendl;
+ return;
+ }
+
+ auto &req = mds->internal_client_requests.at(tid);
+ CDentry *dn = req.get_dentry();
+
+ switch (reply->get_op()) {
+ case CEPH_MDS_OP_RENAME:
+ if (dn) {
+ dn->state_clear(CDentry::STATE_REINTEGRATING);
+
+ MDSContext::vec finished;
+ dn->take_waiting(CDentry::WAIT_REINTEGRATE_FINISH, finished);
+ mds->queue_waiters(finished);
+ }
+ break;
+ default:
+ dout(5) << " unknown client op " << reply->get_op() << dendl;
+ }
+
+ mds->internal_client_requests.erase(tid);
+}
+
void Server::handle_osd_map()
{
/* Note that we check the OSDMAP_FULL flag directly rather than
// while session is opening.
bool allow_prealloc_inos = mdr->session->is_open();
+ inodeno_t _useino = useino;
+
// assign ino
- if (allow_prealloc_inos && (mdr->used_prealloc_ino = _inode->ino = mdr->session->take_ino(useino))) {
- mds->sessionmap.mark_projected(mdr->session);
- dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
- << " (" << mdr->session->info.prealloc_inos.size() << " left)"
- << dendl;
- } else {
- mdr->alloc_ino =
- _inode->ino = mds->inotable->project_alloc_id(useino);
- dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino << dendl;
- }
+ do {
+ if (allow_prealloc_inos && (mdr->used_prealloc_ino = _inode->ino = mdr->session->take_ino(_useino))) {
+ if (mdcache->test_and_clear_taken_inos(_inode->ino)) {
+ _inode->ino = 0;
+ dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
+ << " (" << mdr->session->info.prealloc_inos.size() << " left)"
+ << " but has been taken, will try again!" << dendl;
+ } else {
+ mds->sessionmap.mark_projected(mdr->session);
+ dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
+ << " (" << mdr->session->info.prealloc_inos.size() << " left)"
+ << dendl;
+ }
+ } else {
+ mdr->alloc_ino =
+ _inode->ino = mds->inotable->project_alloc_id(_useino);
+ if (mdcache->test_and_clear_taken_inos(_inode->ino)) {
+ mds->inotable->apply_alloc_id(_inode->ino);
+ _inode->ino = 0;
+ dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino
+ << " but has been taken, will try again!" << dendl;
+ } else {
+ dout(10) << "prepare_new_inode alloc " << mdr->alloc_ino << dendl;
+ }
+ }
+ _useino = 0;
+ } while (!_inode->ino);
if (useino && useino != _inode->ino) {
dout(0) << "WARNING: client specified " << useino << " and i allocated " << _inode->ino << dendl;
<< " but mds." << mds->get_nodeid() << " allocated " << _inode->ino;
//ceph_abort(); // just for now.
}
-
+
if (allow_prealloc_inos &&
mdr->session->get_num_projected_prealloc_inos() < g_conf()->mds_client_prealloc_inos / 2) {
int need = g_conf()->mds_client_prealloc_inos - mdr->session->get_num_projected_prealloc_inos();
CInode *diri = dir->get_inode();
if (!mdr->reqid.name.is_mds()) {
- if (diri->is_system() && !diri->is_root()) {
+ if (diri->is_system() && !diri->is_root() &&
+ (!diri->is_lost_and_found() ||
+ mdr->client_request->get_op() != CEPH_MDS_OP_UNLINK)) {
respond_to_request(mdr, -CEPHFS_EROFS);
return nullptr;
}
void finish(int r) override {
ceph_assert(r == 0);
+ // crash current MDS and the replacing MDS will test the journal
+ ceph_assert(!g_conf()->mds_kill_skip_replaying_inotable);
+
dn->pop_projected_linkage();
// dirty inode, dn, dir
dn->add_waiter(CDentry::WAIT_UNLINK_FINISH, new C_WaitUnlinkToFinish(mdcache, dn, fin));
}
+struct C_WaitReintegrateToFinish : public MDSContext {
+protected:
+ MDCache *mdcache;
+ CDentry *dn;
+ MDSContext *fin;
+
+ MDSRank *get_mds() override
+ {
+ ceph_assert(mdcache != NULL);
+ return mdcache->mds;
+ }
+
+public:
+ C_WaitReintegrateToFinish(MDCache *m, CDentry *d, MDSContext *f) :
+ mdcache(m), dn(d), fin(f) {}
+ void finish(int r) override {
+ fin->complete(r);
+ dn->put(CDentry::PIN_PURGING);
+ }
+};
+
+bool Server::is_reintegrate_pending(CDentry *dn)
+{
+ CDentry::linkage_t *dnl = dn->get_projected_linkage();
+ if (!dnl->is_null() && dn->state_test(CDentry::STATE_REINTEGRATING)) {
+ return true;
+ }
+ return false;
+}
+
+void Server::wait_for_pending_reintegrate(CDentry *dn, MDRequestRef& mdr)
+{
+ dout(20) << __func__ << " dn " << *dn << dendl;
+ mds->locker->drop_locks(mdr.get());
+ auto fin = new C_MDS_RetryRequest(mdcache, mdr);
+ dn->get(CDentry::PIN_PURGING);
+ dn->add_waiter(CDentry::WAIT_REINTEGRATE_FINISH, new C_WaitReintegrateToFinish(mdcache, dn, fin));
+}
+
// MKNOD
class C_MDS_mknod_finish : public ServerLogContext {
void finish(int r) override {
ceph_assert(r == 0);
+ // crash current MDS and the replacing MDS will test the journal
+ ceph_assert(!g_conf()->mds_kill_skip_replaying_inotable);
+
// link the inode
dn->pop_projected_linkage();
journal_and_reply(mdr, newi, dn, le, new C_MDS_mknod_finish(this, mdr, dn, newi));
mds->balancer->maybe_fragment(dir, false);
+
+ // flush the journal as soon as possible
+ if (g_conf()->mds_kill_skip_replaying_inotable) {
+ mdlog->flush();
+ }
}
if (!dn)
return;
+ if (is_reintegrate_pending(dn)) {
+ wait_for_pending_reintegrate(dn, mdr);
+ return;
+ }
+
// notify replica MDSes the dentry is under unlink
if (!dn->state_test(CDentry::STATE_UNLINKING)) {
dn->state_set(CDentry::STATE_UNLINKING);