// open a session?
if (!have_open_session(mds)) {
session = _get_or_open_mds_session(mds);
-
+ if (session->state == MetaSession::STATE_REJECTED) {
+ request->abort(-EPERM);
+ break;
+ }
// wait
if (session->state == MetaSession::STATE_OPENING) {
ldout(cct, 10) << "waiting for session to mds." << mds << " to open" << dendl;
wait_on_context_list(session->waiting_for_open);
- // Abort requests on REJECT from MDS
- if (rejected_by_mds.count(mds)) {
- request->abort(-EPERM);
- break;
- }
continue;
}
ceph_assert(em.second); /* not already present */
MetaSession *session = &em.first->second;
- // Maybe skip sending a request to open if this MDS daemon
- // has previously sent us a REJECT.
- if (rejected_by_mds.count(mds)) {
- if (rejected_by_mds[mds] == session->addrs) {
- ldout(cct, 4) << __func__ << " mds." << mds << " skipping "
- "because we were rejected" << dendl;
- return session;
- } else {
- ldout(cct, 4) << __func__ << " mds." << mds << " old inst "
- "rejected us, trying with new inst" << dendl;
- rejected_by_mds.erase(mds);
- }
- }
-
auto m = make_message<MClientSession>(CEPH_SESSION_REQUEST_OPEN);
m->metadata = metadata;
m->supported_features = feature_bitset_t(CEPHFS_FEATURES_CLIENT_SUPPORTED);
s->con->send_message2(make_message<MClientSession>(CEPH_SESSION_REQUEST_CLOSE, s->seq));
}
-void Client::_closed_mds_session(MetaSession *s)
+void Client::_closed_mds_session(MetaSession *s, int err, bool rejected)
{
ldout(cct, 5) << __func__ << " mds." << s->mds_num << " seq " << s->seq << dendl;
- s->state = MetaSession::STATE_CLOSED;
+ if (rejected && s->state != MetaSession::STATE_CLOSING)
+ s->state = MetaSession::STATE_REJECTED;
+ else
+ s->state = MetaSession::STATE_CLOSED;
s->con->mark_down();
signal_context_list(s->waiting_for_open);
mount_cond.notify_all();
- remove_session_caps(s);
+ remove_session_caps(s, err);
kick_requests_closed(s);
- mds_sessions.erase(s->mds_num);
+ mds_ranks_closing.erase(s->mds_num);
+ if (s->state == MetaSession::STATE_CLOSED)
+ mds_sessions.erase(s->mds_num);
}
void Client::handle_client_session(const MConstRef<MClientSession>& m)
if (!missing_features.empty()) {
lderr(cct) << "mds." << from << " lacks required features '"
<< missing_features << "', closing session " << dendl;
- rejected_by_mds[session->mds_num] = session->addrs;
_close_mds_session(session);
- _closed_mds_session(session);
+ _closed_mds_session(session, -EPERM, true);
break;
}
session->mds_features = std::move(m->supported_features);
error_str = "unknown error";
lderr(cct) << "mds." << from << " rejected us (" << error_str << ")" << dendl;
- rejected_by_mds[session->mds_num] = session->addrs;
- _closed_mds_session(session);
+ _closed_mds_session(session, -EPERM, true);
}
break;
for (auto it = mds_sessions.begin(); it != mds_sessions.end(); ) {
MetaSession &s = it->second;
+ if (s.state == MetaSession::STATE_REJECTED) {
+ mds_sessions.erase(it++);
+ continue;
+ }
++it;
if (s.state == MetaSession::STATE_STALE)
_closed_mds_session(&s);
}
}
-int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff)
+int Client::get_caps(Fh *fh, int need, int want, int *phave, loff_t endoff)
{
+ Inode *in = fh->inode.get();
+
int r = check_pool_perm(in, need);
if (r < 0)
return r;
return -EBADF;
}
+ if ((fh->mode & CEPH_FILE_MODE_WR) && fh->gen != fd_gen)
+ return -EBADF;
+
+ if ((in->flags & I_ERROR_FILELOCK) && fh->has_any_filelocks())
+ return -EIO;
+
int implemented;
int have = in->caps_issued(&implemented);
remove_cap(&in->caps.begin()->second, true);
}
-void Client::remove_session_caps(MetaSession *s)
+void Client::remove_session_caps(MetaSession *s, int err)
{
ldout(cct, 10) << __func__ << " mds." << s->mds_num << dendl;
dirty_caps = in->dirty_caps | in->flushing_caps;
in->wanted_max_size = 0;
in->requested_max_size = 0;
+ if (in->has_any_filelocks())
+ in->flags |= I_ERROR_FILELOCK;
}
+ auto caps = cap->implemented;
if (cap->wanted | cap->issued)
in->flags |= I_CAP_DROPPED;
remove_cap(cap, false);
in->mark_caps_clean();
put_inode(in.get());
}
+ caps &= CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_BUFFER;
+ if (caps && !in->caps_issued_mask(caps, true)) {
+ if (err == -EBLACKLISTED) {
+ if (in->oset.dirty_or_tx) {
+ lderr(cct) << __func__ << " still has dirty data on " << *in << dendl;
+ in->set_async_err(err);
+ }
+ objectcacher->purge_set(&in->oset);
+ } else {
+ objectcacher->release_set(&in->oset);
+ }
+ _schedule_invalidate_callback(in.get(), 0, 0);
+ }
+
signal_cond_list(in->waitfor_caps);
}
s->flushing_caps_tids.clear();
void Client::_close_sessions()
{
+ for (auto it = mds_sessions.begin(); it != mds_sessions.end(); ) {
+ if (it->second.state == MetaSession::STATE_REJECTED)
+ mds_sessions.erase(it++);
+ else
+ ++it;
+ }
+
while (!mds_sessions.empty()) {
// send session closes!
for (auto &p : mds_sessions) {
if (p.second.state != MetaSession::STATE_CLOSING) {
_close_mds_session(&p.second);
+ mds_ranks_closing.insert(p.first);
}
}
// wait for sessions to close
- ldout(cct, 2) << "waiting for " << mds_sessions.size() << " mds sessions to close" << dendl;
+ double timo = cct->_conf.get_val<std::chrono::seconds>("client_shutdown_timeout").count();
+ ldout(cct, 2) << "waiting for " << mds_ranks_closing.size() << " mds session(s) to close (timeout: "
+ << timo << "s)" << dendl;
std::unique_lock l{client_lock, std::adopt_lock};
- mount_cond.wait(l);
+ if (!timo) {
+ mount_cond.wait(l);
+ } else if (!mount_cond.wait_for(l, ceph::make_timespan(timo), [this] { return mds_ranks_closing.empty(); })) {
+ ldout(cct, 1) << mds_ranks_closing.size() << " mds(s) did not respond to session close -- timing out." << dendl;
+ while (!mds_ranks_closing.empty()) {
+ auto session = mds_sessions.at(*mds_ranks_closing.begin());
+ // this prunes entry from mds_sessions and mds_ranks_closing
+ _closed_mds_session(&session, -ETIMEDOUT);
+ }
+ }
+
+ mds_ranks_closing.clear();
l.release();
}
}
// Force-close all sessions
while(!mds_sessions.empty()) {
auto& session = mds_sessions.begin()->second;
- _closed_mds_session(&session);
+ _closed_mds_session(&session, err);
}
}
}
trim_cache(true);
+
+ if (blacklisted && mounted &&
+ last_auto_reconnect + 30 * 60 < now &&
+ cct->_conf.get_val<bool>("client_reconnect_stale")) {
+ messenger->client_reset();
+ fd_gen++; // invalidate open files
+ blacklisted = false;
+ _kick_stale_sessions();
+ last_auto_reconnect = now;
+ }
}
void Client::renew_caps()
Fh *Client::_create_fh(Inode *in, int flags, int cmode, const UserPerm& perms)
{
ceph_assert(in);
- Fh *f = new Fh(in, flags, cmode, perms);
+ Fh *f = new Fh(in, flags, cmode, fd_gen, perms);
ldout(cct, 10) << __func__ << " " << in->ino << " mode " << cmode << dendl;
if (cmode & CEPH_FILE_MODE_RD)
need |= CEPH_CAP_FILE_RD;
- result = get_caps(in, need, want, &have, -1);
+ Fh fh(in, flags, cmode, fd_gen, perms);
+ result = get_caps(&fh, need, want, &have, -1);
if (result < 0) {
ldout(cct, 8) << "Unable to get caps after open of inode " << *in <<
" . Denying open: " <<
int Client::read(int fd, char *buf, loff_t size, loff_t offset)
{
- std::lock_guard lock(client_lock);
+ std::unique_lock lock(client_lock);
tout(cct) << "read" << std::endl;
tout(cct) << fd << std::endl;
tout(cct) << size << std::endl;
int r = _read(f, offset, size, &bl);
ldout(cct, 3) << "read(" << fd << ", " << (void*)buf << ", " << size << ", " << offset << ") = " << r << dendl;
if (r >= 0) {
+ lock.unlock();
bl.begin().copy(bl.length(), buf);
r = bl.length();
}
want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_CACHE;
- r = get_caps(in, CEPH_CAP_FILE_RD, want, &have, -1);
+ r = get_caps(f, CEPH_CAP_FILE_RD, want, &have, -1);
if (r < 0) {
goto done;
}
want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_BUFFER;
- int r = get_caps(in, CEPH_CAP_FILE_WR|CEPH_CAP_AUTH_SHARED, want, &have, endoff);
+ int r = get_caps(f, CEPH_CAP_FILE_WR|CEPH_CAP_AUTH_SHARED, want, &have, endoff);
if (r < 0)
return r;
in->truncate_size, in->truncate_seq,
&onfinish);
client_lock.unlock();
- onfinish.wait();
+ r = onfinish.wait();
client_lock.lock();
_sync_write_commit(in);
+ if (r < 0)
+ goto done;
}
// if we get here, write was successful, update client metadata
<< " type " << fl->l_type << " owner " << owner
<< " " << fl->l_start << "~" << fl->l_len << dendl;
+ if (in->flags & I_ERROR_FILELOCK)
+ return -EIO;
+
int lock_cmd;
if (F_RDLCK == fl->l_type)
lock_cmd = CEPH_LOCK_SHARED;
Inode *in = fh->inode.get();
ldout(cct, 10) << __func__ << " " << fh << " ino " << in->ino << dendl;
+ list<ceph_filelock> activated_locks;
+
list<pair<int, ceph_filelock> > to_release;
if (fh->fcntl_locks) {
auto &lock_state = fh->fcntl_locks;
- for(multimap<uint64_t, ceph_filelock>::iterator p = lock_state->held_locks.begin();
- p != lock_state->held_locks.end();
- ++p)
- to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FCNTL, p->second));
+ for(auto p = lock_state->held_locks.begin(); p != lock_state->held_locks.end(); ) {
+ auto q = p++;
+ if (in->flags & I_ERROR_FILELOCK) {
+ lock_state->remove_lock(q->second, activated_locks);
+ } else {
+ to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FCNTL, q->second));
+ }
+ }
lock_state.reset();
}
if (fh->flock_locks) {
auto &lock_state = fh->flock_locks;
- for(multimap<uint64_t, ceph_filelock>::iterator p = lock_state->held_locks.begin();
- p != lock_state->held_locks.end();
- ++p)
- to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FLOCK, p->second));
+ for(auto p = lock_state->held_locks.begin(); p != lock_state->held_locks.end(); ) {
+ auto q = p++;
+ if (in->flags & I_ERROR_FILELOCK) {
+ lock_state->remove_lock(q->second, activated_locks);
+ } else {
+ to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FLOCK, q->second));
+ }
+ }
lock_state.reset();
}
- if (to_release.empty())
- return;
+ if ((in->flags & I_ERROR_FILELOCK) && !in->has_any_filelocks())
+ in->flags &= ~I_ERROR_FILELOCK;
- // mds has already released filelocks if session was closed.
- if (in->caps.empty())
+ if (to_release.empty())
return;
struct flock fl;
in->mtime = diri->mtime;
in->ctime = diri->ctime;
in->btime = diri->btime;
+ in->atime = diri->atime;
in->size = diri->size;
in->change_attr = diri->change_attr;
return -EROFS;
}
+ if (size == 0) {
+ value = "";
+ } else if (value == NULL) {
+ return -EINVAL;
+ }
+
bool posix_acl_xattr = false;
if (acl_type == POSIX_ACL)
posix_acl_xattr = !strncmp(name, "system.", 7);
bool Client::_vxattrcb_quota_exists(Inode *in)
{
return in->quota.is_enable() &&
- in->snaprealm && in->snaprealm->ino == in->ino;
+ (in->snapid != CEPH_NOSNAP ||
+ (in->snaprealm && in->snaprealm->ino == in->ino));
}
size_t Client::_vxattrcb_quota(Inode *in, char *val, size_t size)
{
/* We can't return bytes written larger than INT_MAX, clamp len to that */
len = std::min(len, (loff_t)INT_MAX);
- return _read(fh, off, len, bl);
+ int r = _read(fh, off, len, bl);
+ ldout(cct, 3) << "ll_read " << fh << " " << off << "~" << len << " = " << r
+ << dendl;
+ return r;
}
int Client::ll_read_block(Inode *in, uint64_t blockid,
}
int have;
- int r = get_caps(in, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, &have, -1);
+ int r = get_caps(fh, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, &have, -1);
if (r < 0)
return r;
case MetaSession::STATE_OPEN:
{
objecter->maybe_request_map(); /* to check if we are blacklisted */
- const auto& conf = cct->_conf;
- if (conf->client_reconnect_stale) {
+ if (cct->_conf.get_val<bool>("client_reconnect_stale")) {
ldout(cct, 1) << "reset from mds we were open; close mds session for reconnect" << dendl;
_closed_mds_session(s);
} else {
MetaSession *session;
if (!have_open_session(mds)) {
session = _get_or_open_mds_session(mds);
+ if (session->state == MetaSession::STATE_REJECTED)
+ return -EPERM;
if (session->state != MetaSession::STATE_OPENING) {
// umounting?
return -EINVAL;
}
ldout(cct, 10) << "waiting for session to mds." << mds << " to open" << dendl;
wait_on_context_list(session->waiting_for_open);
- if (rejected_by_mds.count(mds))
- return -EPERM;
continue;
}