#include "messages/MMDSLoadTargets.h"
#include "messages/MMDSTableRequest.h"
+#include "mgr/MgrClient.h"
+
#include "MDSDaemon.h"
#include "MDSMap.h"
#include "SnapClient.h"
#define dout_subsys ceph_subsys_mds
#undef dout_prefix
#define dout_prefix *_dout << "mds." << whoami << '.' << incarnation << ' '
-
+using TOPNSPC::common::cmd_getval;
class C_Flush_Journal : public MDSInternalContext {
public:
C_Flush_Journal(MDCache *mdcache, MDLog *mdlog, MDSRank *mds,
}
void send() {
- assert(mds->mds_lock.is_locked());
+ assert(ceph_mutex_is_locked(mds->mds_lock));
dout(20) << __func__ << dendl;
// previous segments for expiry
mdlog->start_new_segment();
- Context *ctx = new FunctionContext([this](int r) {
+ Context *ctx = new LambdaContext([this](int r) {
handle_flush_mdlog(r);
});
void clear_mdlog() {
dout(20) << __func__ << dendl;
- Context *ctx = new FunctionContext([this](int r) {
+ Context *ctx = new LambdaContext([this](int r) {
handle_clear_mdlog(r);
});
return;
}
- Context *ctx = new FunctionContext([this](int r) {
+ Context *ctx = new LambdaContext([this](int r) {
handle_expire_segments(r);
});
expiry_gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
void trim_segments() {
dout(20) << __func__ << dendl;
- Context *ctx = new C_OnFinisher(new FunctionContext([this](int _) {
+ Context *ctx = new C_OnFinisher(new LambdaContext([this](int) {
std::lock_guard locker(mds->mds_lock);
trim_expired_segments();
}), mds->finisher);
void write_journal_head() {
dout(20) << __func__ << dendl;
- Context *ctx = new FunctionContext([this](int r) {
+ Context *ctx = new LambdaContext([this](int r) {
std::lock_guard locker(mds->mds_lock);
handle_write_head(r);
});
void send() {
// not really a hard requirement here, but lets ensure this in
// case we change the logic here.
- assert(mds->mds_lock.is_locked());
+ assert(ceph_mutex_is_locked(mds->mds_lock));
dout(20) << __func__ << dendl;
f->open_object_section("result");
C_ContextTimeout(MDSRank *mds, uint64_t timeout, Context *on_finish)
: MDSInternalContext(mds),
timeout(timeout),
- lock("mds::context::timeout", false, true),
on_finish(on_finish) {
}
~C_ContextTimeout() {
return;
}
- timer_task = new FunctionContext([this](int _) {
+ timer_task = new LambdaContext([this](int) {
timer_task = nullptr;
complete(-ETIMEDOUT);
});
}
uint64_t timeout;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("mds::context::timeout");
Context *on_finish = nullptr;
Context *timer_task = nullptr;
};
caps_recalled += count;
if ((throttled || count > 0) && (recall_timeout == 0 || duration < recall_timeout)) {
C_ContextTimeout *ctx = new C_ContextTimeout(
- mds, 1, new FunctionContext([this](int r) {
+ mds, 1, new LambdaContext([this](int r) {
recall_client_state();
}));
ctx->start_timer();
} else {
uint64_t remaining = (recall_timeout == 0 ? 0 : recall_timeout-duration);
C_ContextTimeout *ctx = new C_ContextTimeout(
- mds, remaining, new FunctionContext([this](int r) {
+ mds, remaining, new LambdaContext([this](int r) {
handle_recall_client_state(r);
}));
void flush_journal() {
dout(20) << __func__ << dendl;
- Context *ctx = new FunctionContext([this](int r) {
+ Context *ctx = new LambdaContext([this](int r) {
handle_flush_journal(r);
});
auto [throttled, count] = do_trim();
if (throttled && count > 0) {
- auto timer = new FunctionContext([this](int _) {
+ auto timer = new LambdaContext([this](int) {
trim_cache();
});
mds->timer.add_event_after(1.0, timer);
MDSRank::MDSRank(
mds_rank_t whoami_,
- Mutex &mds_lock_,
+ ceph::mutex &mds_lock_,
LogChannelRef &clog_,
SafeTimer &timer_,
Beacon &beacon_,
std::unique_ptr<MDSMap>& mdsmap_,
Messenger *msgr,
MonClient *monc_,
+ MgrClient *mgrc,
Context *respawn_hook_,
- Context *suicide_hook_)
- :
- whoami(whoami_), incarnation(0),
- mds_lock(mds_lock_), cct(msgr->cct), clog(clog_), timer(timer_),
- mdsmap(mdsmap_),
+ Context *suicide_hook_) :
+ cct(msgr->cct), mds_lock(mds_lock_), clog(clog_),
+ timer(timer_), mdsmap(mdsmap_),
objecter(new Objecter(g_ceph_context, msgr, monc_, nullptr, 0, 0)),
- server(NULL), mdcache(NULL), locker(NULL), mdlog(NULL),
- balancer(NULL), scrubstack(NULL),
- damage_table(whoami_),
- inotable(NULL), snapserver(NULL), snapclient(NULL),
- sessionmap(this), logger(NULL), mlogger(NULL),
+ damage_table(whoami_), sessionmap(this),
op_tracker(g_ceph_context, g_conf()->mds_enable_op_tracker,
g_conf()->osd_num_op_tracker_shard),
- last_state(MDSMap::STATE_BOOT),
- state(MDSMap::STATE_BOOT),
- cluster_degraded(false), stopping(false),
+ progress_thread(this), whoami(whoami_),
purge_queue(g_ceph_context, whoami_,
mdsmap_->get_metadata_pool(), objecter,
- new FunctionContext(
- [this](int r){
- // Purge Queue operates inside mds_lock when we're calling into
- // it, and outside when in background, so must handle both cases.
- if (mds_lock.is_locked_by_me()) {
- handle_write_error(r);
- } else {
- std::lock_guard l(mds_lock);
- handle_write_error(r);
- }
- }
+ new LambdaContext([this](int r) {
+ std::lock_guard l(mds_lock);
+ handle_write_error(r);
+ }
)
),
- progress_thread(this), dispatch_depth(0),
- hb(NULL), last_tid(0), osd_epoch_barrier(0), beacon(beacon_),
- mds_slow_req_count(0),
- last_client_mdsmap_bcast(0),
- messenger(msgr), monc(monc_),
+ beacon(beacon_),
+ messenger(msgr), monc(monc_), mgrc(mgrc),
respawn_hook(respawn_hook_),
suicide_hook(suicide_hook_),
- standby_replaying(false),
starttime(mono_clock::now())
{
hb = g_ceph_context->get_heartbeat_map()->add_worker("MDSRank", pthread_self());
purge_queue.update_op_limit(*mdsmap);
- objecter->unset_honor_osdmap_full();
+ objecter->unset_honor_pool_full();
- finisher = new Finisher(cct);
+ finisher = new Finisher(cct, "MDSRank", "MR_Finisher");
mdcache = new MDCache(this, purge_queue);
mdlog = new MDLog(this);
balancer = new MDBalancer(this, messenger, monc);
- scrubstack = new ScrubStack(mdcache, finisher);
+ scrubstack = new ScrubStack(mdcache, clog, finisher);
inotable = new InoTable(this);
snapserver = new SnapServer(this, monc);
cct->_conf->mds_op_log_threshold);
op_tracker.set_history_size_and_duration(cct->_conf->mds_op_history_size,
cct->_conf->mds_op_history_duration);
+
+ schedule_update_timer_task();
}
MDSRank::~MDSRank()
if (send) {
dout(15) << "updating export_targets, now " << new_map_targets.size() << " ranks are targets" << dendl;
- auto m = MMDSLoadTargets::create(mds_gid_t(monc->get_global_id()), new_map_targets);
+ auto m = make_message<MMDSLoadTargets>(mds_gid_t(monc->get_global_id()), new_map_targets);
monc->send_mon_message(m.detach());
}
}
purge_queue.shutdown();
- mds_lock.Unlock();
+ mds_lock.unlock();
finisher->stop(); // no flushing
- mds_lock.Lock();
+ mds_lock.lock();
if (objecter->initialized)
objecter->shutdown();
// release mds_lock for finisher/messenger threads (e.g.
// MDSDaemon::ms_handle_reset called from Messenger).
- mds_lock.Unlock();
+ mds_lock.unlock();
// shut down messenger
messenger->shutdown();
- mds_lock.Lock();
+ mds_lock.lock();
// Workaround unclean shutdown: HeartbeatMap will assert if
// worker is not removed (as we do in ~MDS), but ~MDS is not
void MDSRank::damaged()
{
ceph_assert(whoami != MDS_RANK_NONE);
- ceph_assert(mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
beacon.set_want_state(*mdsmap, MDSMap::STATE_DAMAGED);
monc->flush_log(); // Flush any clog error from before we were called
void *MDSRank::ProgressThread::entry()
{
- std::lock_guard l(mds->mds_lock);
+ std::unique_lock l(mds->mds_lock);
while (true) {
- while (!mds->stopping &&
- mds->finished_queue.empty() &&
- (mds->waiting_for_nolaggy.empty() || mds->beacon.is_laggy())) {
- cond.Wait(mds->mds_lock);
- }
+ cond.wait(l, [this] {
+ return (mds->stopping ||
+ !mds->finished_queue.empty() ||
+ (!mds->waiting_for_nolaggy.empty() && !mds->beacon.is_laggy()));
+ });
if (mds->stopping) {
break;
void MDSRank::ProgressThread::shutdown()
{
- ceph_assert(mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
ceph_assert(mds->stopping);
if (am_self()) {
// Stopping is set, we will fall out of our main loop naturally
} else {
// Kick the thread to notice mds->stopping, and join it
- cond.Signal();
- mds->mds_lock.Unlock();
+ cond.notify_all();
+ mds->mds_lock.unlock();
if (is_started())
join();
- mds->mds_lock.Lock();
+ mds->mds_lock.lock();
}
}
-bool MDSRankDispatcher::ms_dispatch(const Message::const_ref &m)
+bool MDSRankDispatcher::ms_dispatch(const cref_t<Message> &m)
{
if (m->get_source().is_client()) {
Session *session = static_cast<Session*>(m->get_connection()->get_priv().get());
return ret;
}
-bool MDSRank::_dispatch(const Message::const_ref &m, bool new_msg)
+bool MDSRank::_dispatch(const cref_t<Message> &m, bool new_msg)
{
if (is_stale_message(m)) {
return true;
// pick a random dir inode
CInode *in = mdcache->hack_pick_random_inode();
- list<CDir*> ls;
- in->get_dirfrags(ls);
+ auto&& ls = in->get_dirfrags();
if (!ls.empty()) { // must be an open dir.
- list<CDir*>::iterator p = ls.begin();
- int n = rand() % ls.size();
- while (n--)
- ++p;
- CDir *dir = *p;
+ const auto& dir = ls[rand() % ls.size()];
if (!dir->get_parent_dir()) continue; // must be linked.
if (!dir->is_auth()) continue; // must be auth.
// pick a random dir inode
CInode *in = mdcache->hack_pick_random_inode();
- list<CDir*> ls;
- in->get_dirfrags(ls);
+ auto&& ls = in->get_dirfrags();
if (ls.empty()) continue; // must be an open dir.
CDir *dir = ls.front();
if (!dir->get_parent_dir()) continue; // must be linked.
/*
* lower priority messages we defer if we seem laggy
*/
-bool MDSRank::handle_deferrable_message(const Message::const_ref &m)
+bool MDSRank::handle_deferrable_message(const cref_t<Message> &m)
{
int port = m->get_type() & 0xff00;
case MSG_MDS_TABLE_REQUEST:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
{
- const MMDSTableRequest::const_ref &req = MMDSTableRequest::msgref_cast(m);
+ const cref_t<MMDSTableRequest> &req = ref_cast<MMDSTableRequest>(m);
if (req->op < 0) {
MDSTableClient *client = get_table_client(req->table);
client->handle_request(req);
*/
void MDSRank::_advance_queues()
{
- ceph_assert(mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
if (!finished_queue.empty()) {
dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl;
if (beacon.is_laggy())
break;
- Message::const_ref old = waiting_for_nolaggy.front();
+ cref_t<Message> old = waiting_for_nolaggy.front();
waiting_for_nolaggy.pop_front();
if (!is_stale_message(old)) {
g_ceph_context->get_heartbeat_map()->reset_timeout(hb, grace, 0);
}
-bool MDSRank::is_stale_message(const Message::const_ref &m) const
+bool MDSRank::is_stale_message(const cref_t<Message> &m) const
{
// from bad mds?
if (m->get_source().is_mds()) {
return false;
}
-Session *MDSRank::get_session(const Message::const_ref &m)
+Session *MDSRank::get_session(const cref_t<Message> &m)
{
// do not carry ref
auto session = static_cast<Session *>(m->get_connection()->get_priv().get());
return session;
}
-void MDSRank::send_message(const Message::ref& m, const ConnectionRef& c)
+void MDSRank::send_message(const ref_t<Message>& m, const ConnectionRef& c)
{
ceph_assert(c);
c->send_message2(m);
}
-void MDSRank::send_message_mds(const Message::ref& m, mds_rank_t mds)
+void MDSRank::send_message_mds(const ref_t<Message>& m, mds_rank_t mds)
{
if (!mdsmap->is_up(mds)) {
dout(10) << "send_message_mds mds." << mds << " not up, dropping " << *m << dendl;
// send mdsmap first?
if (mds != whoami && peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
- auto _m = MMDSMap::create(monc->get_fsid(), *mdsmap);
+ auto _m = make_message<MMDSMap>(monc->get_fsid(), *mdsmap);
messenger->send_to_mds(_m.detach(), mdsmap->get_addrs(mds));
peer_mdsmap_epoch[mds] = mdsmap->get_epoch();
}
// send message
- messenger->send_to_mds(Message::ref(m).detach(), mdsmap->get_addrs(mds));
+ messenger->send_to_mds(ref_t<Message>(m).detach(), mdsmap->get_addrs(mds));
}
-void MDSRank::forward_message_mds(const MClientRequest::const_ref& m, mds_rank_t mds)
+void MDSRank::forward_message_mds(const cref_t<MClientRequest>& m, mds_rank_t mds)
{
ceph_assert(mds != whoami);
// tell the client where it should go
auto session = get_session(m);
- auto f = MClientRequestForward::create(m->get_tid(), mds, m->get_num_fwd()+1, client_must_resend);
+ auto f = make_message<MClientRequestForward>(m->get_tid(), mds, m->get_num_fwd()+1, client_must_resend);
send_message_client(f, session);
}
-void MDSRank::send_message_client_counted(const Message::ref& m, client_t client)
+void MDSRank::send_message_client_counted(const ref_t<Message>& m, client_t client)
{
Session *session = sessionmap.get_session(entity_name_t::CLIENT(client.v));
if (session) {
}
}
-void MDSRank::send_message_client_counted(const Message::ref& m, const ConnectionRef& connection)
+void MDSRank::send_message_client_counted(const ref_t<Message>& m, const ConnectionRef& connection)
{
// do not carry ref
auto session = static_cast<Session *>(connection->get_priv().get());
}
}
-void MDSRank::send_message_client_counted(const Message::ref& m, Session* session)
+void MDSRank::send_message_client_counted(const ref_t<Message>& m, Session* session)
{
version_t seq = session->inc_push_seq();
dout(10) << "send_message_client_counted " << session->info.inst.name << " seq "
}
}
-void MDSRank::send_message_client(const Message::ref& m, Session* session)
+void MDSRank::send_message_client(const ref_t<Message>& m, Session* session)
{
dout(10) << "send_message_client " << session->info.inst << " " << *m << dendl;
if (session->get_connection()) {
osd_epoch_barrier = e;
}
-void MDSRank::retry_dispatch(const Message::const_ref &m)
+void MDSRank::retry_dispatch(const cref_t<Message> &m)
{
inc_dispatch_depth();
_dispatch(m, false);
void MDSRank::validate_sessions()
{
- ceph_assert(mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
bool valid = true;
// Identify any sessions which have state inconsistent with other,
return;
mdcache->start_recovered_truncates();
+ mdcache->start_purge_inodes();
mdcache->do_file_recover();
// tell connected clients
}
void MDSRankDispatcher::handle_mds_map(
- const MMDSMap::const_ref &m,
+ const cref_t<MMDSMap> &m,
const MDSMap &oldmap)
{
// I am only to be passed MDSMaps in which I hold a rank
if (objecter->get_client_incarnation() != incarnation)
objecter->set_client_incarnation(incarnation);
- if (oldmap.get_min_compat_client() != mdsmap->get_min_compat_client())
+ if (mdsmap->get_min_compat_client() < ceph_release_t::max &&
+ oldmap.get_min_compat_client() != mdsmap->get_min_compat_client())
server->update_required_client_features();
// for debug
purge_queue.update_op_limit(*mdsmap);
}
+ if (mdsmap->get_inline_data_enabled() && !oldmap.get_inline_data_enabled())
+ dout(0) << "WARNING: inline_data support has been deprecated and will be removed in a future release" << dendl;
+
if (scrubstack->is_scrubbing()) {
if (mdsmap->get_max_mds() > 1) {
auto c = new C_MDSInternalNoop;
snapclient->handle_mds_failure(who);
}
-bool MDSRankDispatcher::handle_asok_command(std::string_view command,
- const cmdmap_t& cmdmap,
- Formatter *f,
- std::ostream& ss)
+void MDSRankDispatcher::handle_asok_command(
+ std::string_view command,
+ const cmdmap_t& cmdmap,
+ Formatter *f,
+ const bufferlist &inbl,
+ std::function<void(int,const std::string&,bufferlist&)> on_finish)
{
+ int r = 0;
+ stringstream ss;
+ bufferlist outbl;
if (command == "dump_ops_in_flight" ||
- command == "ops") {
+ command == "ops") {
if (!op_tracker.dump_ops_in_flight(f)) {
- ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
- please enable \"mds_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
+ ss << "op_tracker disabled; set mds_enable_op_tracker=true to enable";
}
} else if (command == "dump_blocked_ops") {
if (!op_tracker.dump_ops_in_flight(f, true)) {
- ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
- Please enable \"mds_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
+ ss << "op_tracker disabled; set mds_enable_op_tracker=true to enable";
}
} else if (command == "dump_historic_ops") {
if (!op_tracker.dump_historic_ops(f)) {
- ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
- please enable \"mds_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
+ ss << "op_tracker disabled; set mds_enable_op_tracker=true to enable";
}
} else if (command == "dump_historic_ops_by_duration") {
if (!op_tracker.dump_historic_ops(f, true)) {
- ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
- please enable \"mds_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
+ ss << "op_tracker disabled; set mds_enable_op_tracker=true to enable";
}
} else if (command == "osdmap barrier") {
int64_t target_epoch = 0;
- bool got_val = cmd_getval(g_ceph_context, cmdmap, "target_epoch", target_epoch);
+ bool got_val = cmd_getval(cmdmap, "target_epoch", target_epoch);
if (!got_val) {
ss << "no target epoch given";
- return true;
+ r = -EINVAL;
+ goto out;
+ }
+ {
+ std::lock_guard l(mds_lock);
+ set_osd_epoch_barrier(target_epoch);
}
-
- mds_lock.Lock();
- set_osd_epoch_barrier(target_epoch);
- mds_lock.Unlock();
-
C_SaferCond cond;
bool already_got = objecter->wait_for_map(target_epoch, &cond);
if (!already_got) {
dout(4) << __func__ << ": waiting for OSD epoch " << target_epoch << dendl;
cond.wait();
}
- } else if (command == "session ls") {
+ } else if (command == "session ls" ||
+ command == "client ls") {
std::lock_guard l(mds_lock);
+ std::vector<std::string> filter_args;
+ cmd_getval(cmdmap, "filters", filter_args);
+ SessionFilter filter;
+ r = filter.parse(filter_args, &ss);
+ if (r != 0) {
+ goto out;
+ }
+ dump_sessions(filter, f);
+ } else if (command == "session evict" ||
+ command == "client evict") {
+ std::lock_guard l(mds_lock);
+ std::vector<std::string> filter_args;
+ cmd_getval(cmdmap, "filters", filter_args);
- heartbeat_reset();
-
- dump_sessions(SessionFilter(), f);
- } else if (command == "session evict") {
+ SessionFilter filter;
+ r = filter.parse(filter_args, &ss);
+ if (r != 0) {
+ r = -EINVAL;
+ goto out;
+ }
+ evict_clients(filter, on_finish);
+ return;
+ } else if (command == "session kill") {
std::string client_id;
- const bool got_arg = cmd_getval(g_ceph_context, cmdmap, "client_id", client_id);
- if(!got_arg) {
+ if (!cmd_getval(cmdmap, "client_id", client_id)) {
ss << "Invalid client_id specified";
- return true;
+ r = -ENOENT;
+ goto out;
}
-
- mds_lock.Lock();
- std::stringstream dss;
+ std::lock_guard l(mds_lock);
bool evicted = evict_client(strtol(client_id.c_str(), 0, 10), true,
- g_conf()->mds_session_blacklist_on_evict, dss);
+ g_conf()->mds_session_blacklist_on_evict, ss);
if (!evicted) {
- dout(15) << dss.str() << dendl;
- ss << dss.str();
+ dout(15) << ss.str() << dendl;
+ r = -ENOENT;
}
- mds_lock.Unlock();
- } else if (command == "session config") {
+ } else if (command == "session config" ||
+ command == "client config") {
int64_t client_id;
std::string option;
std::string value;
- cmd_getval(g_ceph_context, cmdmap, "client_id", client_id);
- cmd_getval(g_ceph_context, cmdmap, "option", option);
- bool got_value = cmd_getval(g_ceph_context, cmdmap, "value", value);
+ cmd_getval(cmdmap, "client_id", client_id);
+ cmd_getval(cmdmap, "option", option);
+ bool got_value = cmd_getval(cmdmap, "value", value);
- mds_lock.Lock();
- config_client(client_id, !got_value, option, value, ss);
- mds_lock.Unlock();
- } else if (command == "scrub_path") {
+ std::lock_guard l(mds_lock);
+ r = config_client(client_id, !got_value, option, value, ss);
+ } else if (command == "scrub start" ||
+ command == "scrub_start") {
string path;
+ string tag;
vector<string> scrubop_vec;
- cmd_getval(g_ceph_context, cmdmap, "scrubops", scrubop_vec);
- cmd_getval(g_ceph_context, cmdmap, "path", path);
+ cmd_getval(cmdmap, "scrubops", scrubop_vec);
+ cmd_getval(cmdmap, "path", path);
+ cmd_getval(cmdmap, "tag", tag);
/* Multiple MDS scrub is not currently supported. See also: https://tracker.ceph.com/issues/12274 */
if (mdsmap->get_max_mds() > 1) {
ss << "Scrub is not currently supported for multiple active MDS. Please reduce max_mds to 1 and then scrub.";
- return true;
- }
-
- C_SaferCond cond;
- command_scrub_start(f, path, "", scrubop_vec, &cond);
- cond.wait();
+ r = -EINVAL;
+ goto out;
+ }
+
+ finisher->queue(
+ new LambdaContext(
+ [this, on_finish, f, path, tag, scrubop_vec](int r) {
+ command_scrub_start(
+ f, path, tag, scrubop_vec,
+ new LambdaContext(
+ [on_finish](int r) {
+ bufferlist outbl;
+ on_finish(r, {}, outbl);
+ }));
+ }));
+ return;
+ } else if (command == "scrub abort") {
+ finisher->queue(
+ new LambdaContext(
+ [this, on_finish, f](int r) {
+ command_scrub_abort(
+ f,
+ new LambdaContext(
+ [on_finish, f](int r) {
+ bufferlist outbl;
+ f->open_object_section("result");
+ f->dump_int("return_code", r);
+ f->close_section();
+ on_finish(r, {}, outbl);
+ }));
+ }));
+ return;
+ } else if (command == "scrub pause") {
+ finisher->queue(
+ new LambdaContext(
+ [this, on_finish, f](int r) {
+ command_scrub_pause(
+ f,
+ new LambdaContext(
+ [on_finish, f](int r) {
+ bufferlist outbl;
+ f->open_object_section("result");
+ f->dump_int("return_code", r);
+ f->close_section();
+ on_finish(r, {}, outbl);
+ }));
+ }));
+ return;
+ } else if (command == "scrub resume") {
+ command_scrub_resume(f);
+ } else if (command == "scrub status") {
+ command_scrub_status(f);
} else if (command == "tag path") {
string path;
- cmd_getval(g_ceph_context, cmdmap, "path", path);
+ cmd_getval(cmdmap, "path", path);
string tag;
- cmd_getval(g_ceph_context, cmdmap, "tag", tag);
+ cmd_getval(cmdmap, "tag", tag);
command_tag_path(f, path, tag);
} else if (command == "flush_path") {
string path;
- cmd_getval(g_ceph_context, cmdmap, "path", path);
+ cmd_getval(cmdmap, "path", path);
command_flush_path(f, path);
} else if (command == "flush journal") {
command_flush_journal(f);
command_get_subtrees(f);
} else if (command == "export dir") {
string path;
- if(!cmd_getval(g_ceph_context, cmdmap, "path", path)) {
+ if(!cmd_getval(cmdmap, "path", path)) {
ss << "malformed path";
- return true;
+ r = -EINVAL;
+ goto out;
}
int64_t rank;
- if(!cmd_getval(g_ceph_context, cmdmap, "rank", rank)) {
+ if(!cmd_getval(cmdmap, "rank", rank)) {
ss << "malformed rank";
- return true;
+ r = -EINVAL;
+ goto out;
}
command_export_dir(f, path, (mds_rank_t)rank);
} else if (command == "dump cache") {
std::lock_guard l(mds_lock);
string path;
- int r;
- if(!cmd_getval(g_ceph_context, cmdmap, "path", path)) {
+ if (!cmd_getval(cmdmap, "path", path)) {
r = mdcache->dump_cache(f);
} else {
r = mdcache->dump_cache(path);
}
-
- if (r != 0) {
- ss << "Failed to dump cache: " << cpp_strerror(r);
- f->reset();
- }
+ } else if (command == "cache drop") {
+ int64_t timeout = 0;
+ cmd_getval(cmdmap, "timeout", timeout);
+ finisher->queue(
+ new LambdaContext(
+ [this, on_finish, f, timeout](int r) {
+ command_cache_drop(
+ timeout, f,
+ new LambdaContext(
+ [on_finish](int r) {
+ bufferlist outbl;
+ on_finish(r, {}, outbl);
+ }));
+ }));
+ return;
} else if (command == "cache status") {
std::lock_guard l(mds_lock);
mdcache->cache_status(f);
command_dump_tree(cmdmap, ss, f);
} else if (command == "dump loads") {
std::lock_guard l(mds_lock);
- int r = balancer->dump_loads(f);
- if (r != 0) {
- ss << "Failed to dump loads: " << cpp_strerror(r);
- f->reset();
- }
+ r = balancer->dump_loads(f);
} else if (command == "dump snaps") {
std::lock_guard l(mds_lock);
string server;
- cmd_getval(g_ceph_context, cmdmap, "server", server);
+ cmd_getval(cmdmap, "server", server);
if (server == "--server") {
if (mdsmap->get_tableserver() == whoami) {
snapserver->dump(f);
} else {
+ r = -EXDEV;
ss << "Not snapserver";
}
} else {
- int r = snapclient->dump_cache(f);
- if (r != 0) {
- ss << "Failed to dump snapclient: " << cpp_strerror(r);
- f->reset();
- }
+ r = snapclient->dump_cache(f);
}
} else if (command == "force_readonly") {
std::lock_guard l(mds_lock);
command_openfiles_ls(f);
} else if (command == "dump inode") {
command_dump_inode(f, cmdmap, ss);
+ } else if (command == "damage ls") {
+ std::lock_guard l(mds_lock);
+ damage_table.dump(f);
+ } else if (command == "damage rm") {
+ std::lock_guard l(mds_lock);
+ damage_entry_id_t id = 0;
+ if (!cmd_getval(cmdmap, "damage_id", (int64_t&)id)) {
+ r = -EINVAL;
+ goto out;
+ }
+ damage_table.erase(id);
} else {
- return false;
+ r = -ENOSYS;
}
-
- return true;
+out:
+ on_finish(r, ss.str(), outbl);
}
-class C_MDS_Send_Command_Reply : public MDSInternalContext {
-protected:
- MCommand::const_ref m;
-public:
- C_MDS_Send_Command_Reply(MDSRank *_mds, const MCommand::const_ref &_m) :
- MDSInternalContext(_mds), m(_m) {}
-
- void send(int r, std::string_view ss) {
- std::stringstream ds;
- send(r, ss, ds);
- }
-
- void send(int r, std::string_view ss, std::stringstream &ds) {
- bufferlist bl;
- bl.append(ds);
- MDSDaemon::send_command_reply(m, mds, r, bl, ss);
- }
-
- void finish(int r) override {
- send(r, "");
- }
-};
-
-class C_ExecAndReply : public C_MDS_Send_Command_Reply {
-public:
- C_ExecAndReply(MDSRank *mds, const MCommand::const_ref &m)
- : C_MDS_Send_Command_Reply(mds, m), f(true) {
- }
-
- void finish(int r) override {
- std::stringstream ds;
- std::stringstream ss;
- if (r != 0) {
- f.flush(ss);
- } else {
- f.flush(ds);
- }
-
- send(r, ss.str(), ds);
- }
-
- virtual void exec() = 0;
-
-protected:
- JSONFormatter f;
-};
-
-class C_CacheDropExecAndReply : public C_ExecAndReply {
-public:
- C_CacheDropExecAndReply(MDSRank *mds, const MCommand::const_ref &m,
- uint64_t timeout)
- : C_ExecAndReply(mds, m), timeout(timeout) {
- }
-
- void exec() override {
- mds->command_cache_drop(timeout, &f, this);
- }
-
-private:
- uint64_t timeout;
-};
-
-class C_ScrubExecAndReply : public C_ExecAndReply {
-public:
- C_ScrubExecAndReply(MDSRank *mds, const MCommand::const_ref &m,
- const std::string &path, const std::string &tag,
- const std::vector<std::string> &scrubop)
- : C_ExecAndReply(mds, m), path(path), tag(tag), scrubop(scrubop) {
- }
-
- void exec() override {
- mds->command_scrub_start(&f, path, tag, scrubop, this);
- }
-
-private:
- std::string path;
- std::string tag;
- std::vector<std::string> scrubop;
-};
-
-class C_ScrubControlExecAndReply : public C_ExecAndReply {
-public:
- C_ScrubControlExecAndReply(MDSRank *mds, const MCommand::const_ref &m,
- const std::string &command)
- : C_ExecAndReply(mds, m), command(command) {
- }
-
- void exec() override {
- if (command == "abort") {
- mds->command_scrub_abort(&f, this);
- } else if (command == "pause") {
- mds->command_scrub_pause(&f, this);
- } else {
- ceph_abort();
- }
- }
-
- void finish(int r) override {
- f.open_object_section("result");
- f.dump_int("return_code", r);
- f.close_section();
- C_ExecAndReply::finish(r);
- }
-
-private:
- std::string command;
-};
-
/**
* This function drops the mds_lock, so don't do anything with
* MDSRank after calling it (we could have gone into shutdown): just
* send your result back to the calling client and finish.
*/
-void MDSRankDispatcher::evict_clients(const SessionFilter &filter, const MCommand::const_ref &m)
+void MDSRankDispatcher::evict_clients(
+ const SessionFilter &filter,
+ std::function<void(int,const std::string&,bufferlist&)> on_finish)
{
- C_MDS_Send_Command_Reply *reply = new C_MDS_Send_Command_Reply(this, m);
-
+ bufferlist outbl;
if (is_any_replay()) {
- reply->send(-EAGAIN, "MDS is replaying log");
- delete reply;
+ on_finish(-EAGAIN, "MDS is replaying log", outbl);
return;
}
Session *s = p.second;
- if (filter.match(*s, std::bind(&Server::waiting_for_reconnect, server, std::placeholders::_1))) {
+ if (filter.match(*s, std::bind(&Server::waiting_for_reconnect, server,
+ std::placeholders::_1))) {
victims.push_back(s);
}
}
dout(20) << __func__ << " matched " << victims.size() << " sessions" << dendl;
if (victims.empty()) {
- reply->send(0, "");
- delete reply;
+ on_finish(0, {}, outbl);
return;
}
- C_GatherBuilder gather(g_ceph_context, reply);
+ C_GatherBuilder gather(g_ceph_context,
+ new LambdaContext([on_finish](int r) {
+ bufferlist bl;
+ on_finish(r, {}, bl);
+ }));
for (const auto s : victims) {
std::stringstream ss;
evict_client(s->get_client().v, false,
}
void MDSRank::command_scrub_resume(Formatter *f) {
+ std::lock_guard l(mds_lock);
int r = scrubstack->scrub_resume();
f->open_object_section("result");
}
void MDSRank::command_scrub_status(Formatter *f) {
+ std::lock_guard l(mds_lock);
scrubstack->scrub_status(f);
}
{
std::string root;
int64_t depth;
- cmd_getval(g_ceph_context, cmdmap, "root", root);
- if (!cmd_getval(g_ceph_context, cmdmap, "depth", depth))
+ cmd_getval(cmdmap, "root", root);
+ if (!cmd_getval(cmdmap, "depth", depth))
depth = -1;
std::lock_guard l(mds_lock);
CInode *in = mdcache->cache_traverse(filepath(root.c_str()));
std::ostream &ss)
{
std::string path;
- bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
+ bool got = cmd_getval(cmdmap, "path", path);
if (!got) {
ss << "missing path argument";
return NULL;
}
std::string frag_str;
- if (!cmd_getval(g_ceph_context, cmdmap, "frag", frag_str)) {
+ if (!cmd_getval(cmdmap, "frag", frag_str)) {
ss << "missing frag argument";
return NULL;
}
{
std::lock_guard l(mds_lock);
int64_t by = 0;
- if (!cmd_getval(g_ceph_context, cmdmap, "bits", by)) {
+ if (!cmd_getval(cmdmap, "bits", by)) {
ss << "missing bits argument";
return false;
}
{
std::lock_guard l(mds_lock);
std::string path;
- bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
+ bool got = cmd_getval(cmdmap, "path", path);
if (!got) {
ss << "missing path argument";
return false;
}
std::string frag_str;
- if (!cmd_getval(g_ceph_context, cmdmap, "frag", frag_str)) {
+ if (!cmd_getval(cmdmap, "frag", frag_str)) {
ss << "missing frag argument";
return false;
}
{
std::lock_guard l(mds_lock);
std::string path;
- bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
+ bool got = cmd_getval(cmdmap, "path", path);
if (!got) {
ss << "missing path argument";
return false;
{
std::lock_guard l(mds_lock);
int64_t number;
- bool got = cmd_getval(g_ceph_context, cmdmap, "number", number);
+ bool got = cmd_getval(cmdmap, "number", number);
if (!got) {
ss << "missing inode number";
return;
mds_plb.add_u64_counter(l_mds_dir_commit, "dir_commit", "Directory commit");
mds_plb.add_u64_counter(l_mds_dir_split, "dir_split", "Directory split");
mds_plb.add_u64_counter(l_mds_dir_merge, "dir_merge", "Directory merge");
- mds_plb.add_u64(l_mds_inode_max, "inode_max", "Max inodes, cache size");
mds_plb.add_u64(l_mds_inodes_pinned, "inodes_pinned", "Inodes pinned");
mds_plb.add_u64(l_mds_inodes_expired, "inodes_expired", "Inodes expired");
mds_plb.add_u64(l_mds_inodes_with_caps, "inodes_with_caps",
bool wait, bool blacklist, std::ostream& err_ss,
Context *on_killed)
{
- ceph_assert(mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
// Mutually exclusive args
ceph_assert(!(wait && on_killed != nullptr));
std::vector<std::string> cmd = {tmp};
auto kill_client_session = [this, session_id, wait, on_killed](){
- ceph_assert(mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
Session *session = sessionmap.get_session(
entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
if (session) {
C_SaferCond on_safe;
server->kill_session(session, &on_safe);
- mds_lock.Unlock();
+ mds_lock.unlock();
on_safe.wait();
- mds_lock.Lock();
+ mds_lock.lock();
}
} else {
dout(1) << "session " << session_id << " was removed while we waited "
};
auto apply_blacklist = [this, cmd](std::function<void ()> fn){
- ceph_assert(mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
- Context *on_blacklist_done = new FunctionContext([this, fn](int r) {
+ Context *on_blacklist_done = new LambdaContext([this, fn](int r) {
objecter->wait_for_latest_osdmap(
new C_OnFinisher(
- new FunctionContext([this, fn](int r) {
+ new LambdaContext([this, fn](int r) {
std::lock_guard l(mds_lock);
auto epoch = objecter->with_osdmap([](const OSDMap &o){
return o.get_epoch();
if (blacklist) {
C_SaferCond inline_ctx;
apply_blacklist([&inline_ctx](){inline_ctx.complete(0);});
- mds_lock.Unlock();
+ mds_lock.unlock();
inline_ctx.wait();
- mds_lock.Lock();
+ mds_lock.lock();
}
// We dropped mds_lock, so check that session still exists
set<Session*> clients;
sessionmap.get_client_session_set(clients);
for (const auto &session : clients) {
- auto m = MMDSMap::create(monc->get_fsid(), *mdsmap);
+ auto m = make_message<MMDSMap>(monc->get_fsid(), *mdsmap);
session->get_connection()->send_message2(std::move(m));
}
last_client_mdsmap_bcast = mdsmap->get_epoch();
}
-Context *MDSRank::create_async_exec_context(C_ExecAndReply *ctx) {
- return new C_OnFinisher(new FunctionContext([ctx](int _) {
- ctx->exec();
- }), finisher);
-}
-
MDSRankDispatcher::MDSRankDispatcher(
mds_rank_t whoami_,
- Mutex &mds_lock_,
+ ceph::mutex &mds_lock_,
LogChannelRef &clog_,
SafeTimer &timer_,
Beacon &beacon_,
std::unique_ptr<MDSMap> &mdsmap_,
Messenger *msgr,
MonClient *monc_,
+ MgrClient *mgrc,
Context *respawn_hook_,
Context *suicide_hook_)
: MDSRank(whoami_, mds_lock_, clog_, timer_, beacon_, mdsmap_,
- msgr, monc_, respawn_hook_, suicide_hook_)
+ msgr, monc_, mgrc, respawn_hook_, suicide_hook_)
{
- g_conf().add_observer(this);
-}
-
-bool MDSRankDispatcher::handle_command(
- const cmdmap_t &cmdmap,
- const MCommand::const_ref &m,
- int *r,
- std::stringstream *ds,
- std::stringstream *ss,
- Context **run_later,
- bool *need_reply)
-{
- ceph_assert(r != nullptr);
- ceph_assert(ds != nullptr);
- ceph_assert(ss != nullptr);
-
- *need_reply = true;
-
- std::string prefix;
- cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
-
- if (prefix == "session ls" || prefix == "client ls") {
- std::vector<std::string> filter_args;
- cmd_getval(g_ceph_context, cmdmap, "filters", filter_args);
-
- SessionFilter filter;
- *r = filter.parse(filter_args, ss);
- if (*r != 0) {
- return true;
- }
-
- JSONFormatter f(true);
- dump_sessions(filter, &f);
- f.flush(*ds);
- return true;
- } else if (prefix == "session evict" || prefix == "client evict") {
- std::vector<std::string> filter_args;
- cmd_getval(g_ceph_context, cmdmap, "filters", filter_args);
-
- SessionFilter filter;
- *r = filter.parse(filter_args, ss);
- if (*r != 0) {
- return true;
- }
-
- evict_clients(filter, m);
-
- *need_reply = false;
- return true;
- } else if (prefix == "session config" || prefix == "client config") {
- int64_t client_id;
- std::string option;
- std::string value;
-
- cmd_getval(g_ceph_context, cmdmap, "client_id", client_id);
- cmd_getval(g_ceph_context, cmdmap, "option", option);
- bool got_value = cmd_getval(g_ceph_context, cmdmap, "value", value);
-
- *r = config_client(client_id, !got_value, option, value, *ss);
- return true;
- } else if (prefix == "damage ls") {
- JSONFormatter f(true);
- damage_table.dump(&f);
- f.flush(*ds);
- return true;
- } else if (prefix == "damage rm") {
- damage_entry_id_t id = 0;
- bool got = cmd_getval(g_ceph_context, cmdmap, "damage_id", (int64_t&)id);
- if (!got) {
- *r = -EINVAL;
- return true;
- }
-
- damage_table.erase(id);
- return true;
- } else if (prefix == "cache drop") {
- int64_t timeout;
- if (!cmd_getval(g_ceph_context, cmdmap, "timeout", timeout)) {
- timeout = 0;
- }
-
- *need_reply = false;
- *run_later = create_async_exec_context(new C_CacheDropExecAndReply
- (this, m, (uint64_t)timeout));
- return true;
- } else if (prefix == "scrub start") {
- string path;
- string tag;
- vector<string> scrubop_vec;
- cmd_getval(g_ceph_context, cmdmap, "scrubops", scrubop_vec);
- cmd_getval(g_ceph_context, cmdmap, "path", path);
- cmd_getval(g_ceph_context, cmdmap, "tag", tag);
-
- /* Multiple MDS scrub is not currently supported. See also: https://tracker.ceph.com/issues/12274 */
- if (mdsmap->get_max_mds() > 1) {
- *ss << "Scrub is not currently supported for multiple active MDS. Please reduce max_mds to 1 and then scrub.";
- *r = ENOTSUP;
- return true;
- }
-
- *need_reply = false;
- *run_later = create_async_exec_context(new C_ScrubExecAndReply
- (this, m, path, tag, scrubop_vec));
- return true;
- } else if (prefix == "scrub abort") {
- *need_reply = false;
- *run_later = create_async_exec_context(new C_ScrubControlExecAndReply
- (this, m, "abort"));
- return true;
- } else if (prefix == "scrub pause") {
- *need_reply = false;
- *run_later = create_async_exec_context(new C_ScrubControlExecAndReply
- (this, m, "pause"));
- return true;
- } else if (prefix == "scrub resume") {
- JSONFormatter f(true);
- command_scrub_resume(&f);
- f.flush(*ds);
- return true;
- } else if (prefix == "scrub status") {
- JSONFormatter f(true);
- command_scrub_status(&f);
- f.flush(*ds);
- return true;
- } else {
- return false;
- }
+ g_conf().add_observer(this);
}
void MDSRank::command_cache_drop(uint64_t timeout, Formatter *f, Context *on_finish) {
"mds_cache_memory_limit",
"mds_cache_mid",
"mds_cache_reservation",
- "mds_cache_size",
"mds_cache_trim_decay_rate",
"mds_cap_revoke_eviction_timeout",
"mds_dump_cache_threshold_file",
"mds_log_pause",
"mds_max_export_size",
"mds_max_purge_files",
+ "mds_forward_all_requests_to_auth",
"mds_max_purge_ops",
"mds_max_purge_ops_per_pg",
+ "mds_max_snaps_per_dir",
"mds_op_complaint_time",
"mds_op_history_duration",
"mds_op_history_size",
update_log_config();
}
- finisher->queue(new FunctionContext([this, changed](int r) {
+ finisher->queue(new LambdaContext([this, changed](int) {
std::scoped_lock lock(mds_lock);
if (changed.count("mds_log_pause") && !g_conf()->mds_log_pause) {
purge_queue.handle_conf_change(changed, *mdsmap);
}));
}
+
+void MDSRank::get_task_status(std::map<std::string, std::string> *status) {
+ dout(20) << __func__ << dendl;
+
+ // scrub summary for now..
+ std::string_view scrub_summary = scrubstack->scrub_summary();
+ status->emplace(SCRUB_STATUS_KEY, std::move(scrub_summary));
+}
+
+void MDSRank::schedule_update_timer_task() {
+ dout(20) << __func__ << dendl;
+
+ timer.add_event_after(g_conf().get_val<double>("mds_task_status_update_interval"),
+ new LambdaContext([this](int) {
+ send_task_status();
+ }));
+}
+
+void MDSRank::send_task_status() {
+ std::map<std::string, std::string> status;
+ get_task_status(&status);
+
+ if (!status.empty()) {
+ dout(20) << __func__ << ": updating " << status.size() << " status keys" << dendl;
+
+ int r = mgrc->service_daemon_update_task_status(std::move(status));
+ if (r < 0) {
+ derr << ": failed to update service daemon status: " << cpp_strerror(r) << dendl;
+ }
+ }
+
+ schedule_update_timer_task();
+}