#define dout_context g_ceph_context
#undef dout_prefix
-#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".bal "
+#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".bal " << __func__ << " "
#undef dout
#define dout(lvl) \
do {\
#define MIN_OFFLOAD 10 // point at which i stop trying, close enough
-int MDBalancer::proc_message(const Message::const_ref &m)
+int MDBalancer::proc_message(const cref_t<Message> &m)
{
switch (m->get_type()) {
case MSG_MDS_HEARTBEAT:
- handle_heartbeat(MHeartbeat::msgref_cast(m));
+ handle_heartbeat(ref_cast<MHeartbeat>(m));
break;
default:
}
bool remove = true;
- list<CDir*> dfls;
- in->get_dirfrags(dfls);
+ auto&& dfls = in->get_dirfrags();
for (auto dir : dfls) {
if (!dir->is_auth())
continue;
mds_load_t load{DecayRate()}; /* zero DecayRate! */
if (mds->mdcache->get_root()) {
- list<CDir*> ls;
- mds->mdcache->get_root()->get_dirfrags(ls);
+ auto&& ls = mds->mdcache->get_root()->get_dirfrags();
for (auto &d : ls) {
load.auth.add(d->pop_auth_subtree_nested);
load.all.add(d->pop_nested);
}
} else {
- dout(20) << "get_load no root, no load" << dendl;
+ dout(20) << "no root, no load" << dendl;
}
uint64_t num_requests = mds->get_num_requests();
last_get_load = now;
}
- dout(15) << "get_load " << load << dendl;
+ dout(15) << load << dendl;
return load;
}
bool ack = false;
int r = 0;
bufferlist lua_src;
- Mutex lock("lock");
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("lock");
+ ceph::condition_variable cond;
/* we assume that balancer is in the metadata pool */
object_t oid = object_t(mds->mdsmap->get_balancer());
object_locator_t oloc(mds->mdsmap->get_metadata_pool());
ceph_tid_t tid = mds->objecter->read(oid, oloc, 0, 0, CEPH_NOSNAP, &lua_src, 0,
- new C_SafeCond(&lock, &cond, &ack, &r));
+ new C_SafeCond(lock, cond, &ack, &r));
dout(15) << "launched non-blocking read tid=" << tid
<< " oid=" << oid << " oloc=" << oloc << dendl;
/* timeout: if we waste half our time waiting for RADOS, then abort! */
- auto bal_interval = g_conf().get_val<int64_t>("mds_bal_interval");
- lock.Lock();
- int ret_t = cond.WaitInterval(lock, utime_t(bal_interval / 2, 0));
- lock.Unlock();
-
+ std::cv_status ret_t = [&] {
+ auto bal_interval = g_conf().get_val<int64_t>("mds_bal_interval");
+ std::unique_lock locker{lock};
+ return cond.wait_for(locker, std::chrono::seconds(bal_interval / 2));
+ }();
/* success: store the balancer in memory and set the version. */
if (!r) {
- if (ret_t == ETIMEDOUT) {
+ if (ret_t == std::cv_status::timeout) {
mds->objecter->op_cancel(tid, -ECANCELED);
return -ETIMEDOUT;
}
bal_code.assign(lua_src.to_str());
bal_version.assign(oid.name);
- dout(10) << "localized balancer, bal_code=" << bal_code << dendl;
+ dout(10) "bal_code=" << bal_code << dendl;
}
return r;
}
void MDBalancer::send_heartbeat()
{
if (mds->is_cluster_degraded()) {
- dout(10) << "send_heartbeat degraded" << dendl;
+ dout(10) << "degraded" << dendl;
return;
}
if (!mds->mdcache->is_open()) {
- dout(5) << "not open" << dendl;
+ dout(10) << "not open" << dendl;
mds->mdcache->wait_for_open(new C_Bal_SendHeartbeat(mds));
return;
}
mds_import_map[ mds->get_nodeid() ] = import_map;
- dout(5) << "mds." << mds->get_nodeid() << " epoch " << beat_epoch << " load " << load << dendl;
- for (map<mds_rank_t, float>::iterator it = import_map.begin();
- it != import_map.end();
- ++it) {
- dout(5) << " import_map from " << it->first << " -> " << it->second << dendl;
+ dout(3) << " epoch " << beat_epoch << " load " << load << dendl;
+ for (const auto& [rank, load] : import_map) {
+ dout(5) << " import_map from " << rank << " -> " << load << dendl;
}
for (const auto& r : up) {
if (r == mds->get_nodeid())
continue;
- auto hb = MHeartbeat::create(load, beat_epoch);
+ auto hb = make_message<MHeartbeat>(load, beat_epoch);
hb->get_import_map() = import_map;
mds->send_message_mds(hb, r);
}
}
-void MDBalancer::handle_heartbeat(const MHeartbeat::const_ref &m)
+void MDBalancer::handle_heartbeat(const cref_t<MHeartbeat> &m)
{
mds_rank_t who = mds_rank_t(m->get_source().num());
dout(25) << "=== got heartbeat " << m->get_beat() << " from " << m->get_source().num() << " " << m->get_load() << dendl;
if (maxex <= 0 || maxim <= 0) return 0.0;
double howmuch = std::min(maxex, maxim);
- if (howmuch <= 0) return 0.0;
dout(5) << " - mds." << ex << " exports " << howmuch << " to mds." << im << dendl;
// Do the split ASAP: enqueue it in the MDSRank waiters which are
// run at the end of dispatching the current request
mds->queue_waiter(new MDSInternalContextWrapper(mds,
- new FunctionContext(callback)));
+ new LambdaContext(std::move(callback))));
} else if (is_new) {
// Set a timer to really do the split: we don't do it immediately
// so that bursts of ops on a directory have a chance to go through
// before we freeze it.
mds->timer.add_event_after(bal_fragment_interval,
- new FunctionContext(callback));
+ new LambdaContext(std::move(callback)));
}
}
frag_t fg = dir->get_frag();
while (fg != frag_t()) {
frag_t sibfg = fg.get_sibling();
- list<CDir*> sibs;
- bool complete = diri->get_dirfrags_under(sibfg, sibs);
+ auto&& [complete, sibs] = diri->get_dirfrags_under(sibfg);
if (!complete) {
dout(10) << " not all sibs under " << sibfg << " in cache (have " << sibs << ")" << dendl;
break;
}
bool all = true;
- for (list<CDir*>::iterator p = sibs.begin(); p != sibs.end(); ++p) {
- CDir *sib = *p;
+ for (auto& sib : sibs) {
if (!sib->is_auth() || !sib->should_merge()) {
all = false;
break;
};
if (merge_pending.count(frag) == 0) {
- dout(20) << __func__ << " enqueued dir " << *dir << dendl;
+ dout(20) << " enqueued dir " << *dir << dendl;
merge_pending.insert(frag);
mds->timer.add_event_after(bal_fragment_interval,
- new FunctionContext(callback));
+ new LambdaContext(std::move(callback)));
} else {
- dout(20) << __func__ << " dir already in queue " << *dir << dendl;
+ dout(20) << " dir already in queue " << *dir << dendl;
}
}
mds_rank_t whoami = mds->get_nodeid();
rebalance_time = clock::now();
- dout(5) << " prep_rebalance: cluster loads are" << dendl;
+ dout(7) << "cluster loads are" << dendl;
mds->mdcache->migrator->clear_export_queue();
mds_meta_load[i] = l;
if (whoami == 0)
- dout(5) << " mds." << i
+ dout(7) << " mds." << i
<< " " << load
<< " = " << load.mds_load()
<< " ~ " << l << dendl;
// target load
target_load = total_load / (double)cluster_size;
- dout(5) << "prep_rebalance: my load " << my_load
+ dout(7) << "my load " << my_load
<< " target " << target_load
<< " total " << total_load
<< dendl;
// under or over?
- for (auto p : load_map) {
- if (p.first < target_load * (1.0 + g_conf()->mds_bal_min_rebalance)) {
- dout(5) << " mds." << p.second << " is underloaded or barely overloaded." << dendl;
- mds_last_epoch_under_map[p.second] = beat_epoch;
+ for (const auto& [load, rank] : load_map) {
+ if (load < target_load * (1.0 + g_conf()->mds_bal_min_rebalance)) {
+ dout(7) << " mds." << rank << " is underloaded or barely overloaded." << dendl;
+ mds_last_epoch_under_map[rank] = beat_epoch;
}
}
int last_epoch_under = mds_last_epoch_under_map[whoami];
if (last_epoch_under == beat_epoch) {
- dout(5) << " i am underloaded or barely overloaded, doing nothing." << dendl;
+ dout(7) << " i am underloaded or barely overloaded, doing nothing." << dendl;
return;
}
// am i over long enough?
if (last_epoch_under && beat_epoch - last_epoch_under < 2) {
- dout(5) << " i am overloaded, but only for " << (beat_epoch - last_epoch_under) << " epochs" << dendl;
+ dout(7) << " i am overloaded, but only for " << (beat_epoch - last_epoch_under) << " epochs" << dendl;
return;
}
- dout(5) << " i am sufficiently overloaded" << dendl;
+ dout(7) << " i am sufficiently overloaded" << dendl;
// first separate exporters and importers
/* execute the balancer */
Mantle mantle;
int ret = mantle.balance(bal_code, mds->get_nodeid(), metrics, state.targets);
- dout(5) << " mantle decided that new targets=" << state.targets << dendl;
+ dout(7) << " mantle decided that new targets=" << state.targets << dendl;
/* mantle doesn't know about cluster size, so check target len here */
if ((int) state.targets.size() != cluster_size)
// search imports from target
if (import_from_map.count(target)) {
- dout(5) << " aha, looking through imports from target mds." << target << dendl;
+ dout(7) << " aha, looking through imports from target mds." << target << dendl;
for (auto p = import_from_map.equal_range(target);
p.first != p.second; ) {
CDir *dir = p.first->second.first;
double pop = p.first->second.second;
- dout(5) << "considering " << *dir << " from " << (*p.first).first << dendl;
+ dout(7) << "considering " << *dir << " from " << (*p.first).first << dendl;
auto plast = p.first++;
if (dir->inode->is_base())
ceph_assert(dir->inode->authority().first == target); // cuz that's how i put it in the map, dummy
if (pop <= amount-have) {
- dout(5) << "reexporting " << *dir << " pop " << pop
+ dout(7) << "reexporting " << *dir << " pop " << pop
<< " back to mds." << target << dendl;
mds->mdcache->migrator->export_dir_nicely(dir, target);
have += pop;
q.first++;
}
} else {
- dout(5) << "can't reexport " << *dir << ", too big " << pop << dendl;
+ dout(7) << "can't reexport " << *dir << ", too big " << pop << dendl;
}
if (amount-have < MIN_OFFLOAD)
break;
double pop = p->first;
if (pop <= amount-have && pop > MIN_REEXPORT) {
- dout(0) << "reexporting " << *dir << " pop " << pop
+ dout(5) << "reexporting " << *dir << " pop " << pop
<< " to mds." << target << dendl;
have += pop;
mds->mdcache->migrator->export_dir_nicely(dir, target);
continue;
// okay, search for fragments of my workload
- list<CDir*> exports;
+ std::vector<CDir*> exports;
for (auto p = import_pop_map.rbegin();
p != import_pop_map.rend();
++p) {
CDir *dir = p->second;
- find_exports(dir, amount, exports, have, already_exporting);
+ find_exports(dir, amount, &exports, have, already_exporting);
if (amount-have < MIN_OFFLOAD)
break;
}
//fudge = amount - have;
- for (auto dir : exports) {
+ for (const auto& dir : exports) {
dout(5) << " - exporting " << dir->pop_auth_subtree
<< " " << dir->pop_auth_subtree.meta_load()
<< " to mds." << target << " " << *dir << dendl;
}
}
- dout(5) << "rebalance done" << dendl;
+ dout(7) << "done" << dendl;
mds->mdcache->show_subtrees();
}
void MDBalancer::find_exports(CDir *dir,
double amount,
- list<CDir*>& exports,
+ std::vector<CDir*>* exports,
double& have,
set<CDir*>& already_exporting)
{
double midchunk = need * g_conf()->mds_bal_midchunk;
double minchunk = need * g_conf()->mds_bal_minchunk;
- list<CDir*> bigger_rep, bigger_unrep;
+ std::vector<CDir*> bigger_rep, bigger_unrep;
multimap<double, CDir*> smaller;
double dir_pop = dir->pop_auth_subtree.meta_load();
- dout(7) << " find_exports in " << dir_pop << " " << *dir << " need " << need << " (" << needmin << " - " << needmax << ")" << dendl;
+ dout(7) << "in " << dir_pop << " " << *dir << " need " << need << " (" << needmin << " - " << needmax << ")" << dendl;
double subdir_sum = 0;
for (elist<CInode*>::iterator it = dir->pop_lru_subdirs.begin_use_current();
ceph_assert(in->is_dir());
ceph_assert(in->get_parent_dir() == dir);
- list<CDir*> dfls;
- in->get_nested_dirfrags(dfls);
+ auto&& dfls = in->get_nested_dirfrags();
size_t num_idle_frags = 0;
- for (list<CDir*>::iterator p = dfls.begin();
- p != dfls.end();
- ++p) {
- CDir *subdir = *p;
+ for (const auto& subdir : dfls) {
if (already_exporting.count(subdir))
continue;
// lucky find?
if (pop > needmin && pop < needmax) {
- exports.push_back(subdir);
+ exports->push_back(subdir);
already_exporting.insert(subdir);
have += pop;
return;
dout(7) << " taking smaller " << *(*it).second << dendl;
- exports.push_back((*it).second);
+ exports->push_back((*it).second);
already_exporting.insert((*it).second);
have += (*it).first;
if (have > needmin)
}
// apprently not enough; drill deeper into the hierarchy (if non-replicated)
- for (list<CDir*>::iterator it = bigger_unrep.begin();
- it != bigger_unrep.end();
- ++it) {
- dout(15) << " descending into " << **it << dendl;
- find_exports(*it, amount, exports, have, already_exporting);
+ for (const auto& dir : bigger_unrep) {
+ dout(15) << " descending into " << *dir << dendl;
+ find_exports(dir, amount, exports, have, already_exporting);
if (have > needmin)
return;
}
++it) {
dout(7) << " taking (much) smaller " << it->first << " " << *(*it).second << dendl;
- exports.push_back((*it).second);
+ exports->push_back((*it).second);
already_exporting.insert((*it).second);
have += (*it).first;
if (have > needmin)
}
// ok fine, drill into replicated dirs
- for (list<CDir*>::iterator it = bigger_rep.begin();
- it != bigger_rep.end();
- ++it) {
- dout(7) << " descending into replicated " << **it << dendl;
- find_exports(*it, amount, exports, have, already_exporting);
+ for (const auto& dir : bigger_rep) {
+ dout(7) << " descending into replicated " << *dir << dendl;
+ find_exports(dir, amount, exports, have, already_exporting);
if (have > needmin)
return;
}
if (dir->should_split_fast()) {
queue_split(dir, true);
} else {
- dout(10) << __func__ << ": fragment already enqueued to split: "
+ dout(10) << ": fragment already enqueued to split: "
<< *dir << dendl;
}
}
const bool hot = (v > g_conf()->mds_bal_split_rd && type == META_POP_IRD) ||
(v > g_conf()->mds_bal_split_wr && type == META_POP_IWR);
- dout(20) << "hit_dir " << type << " pop is " << v << ", frag " << dir->get_frag()
+ dout(20) << type << " pop is " << v << ", frag " << dir->get_frag()
<< " size " << dir->get_frag_size() << " " << dir->pop_me << dendl;
maybe_fragment(dir, hot);
//if (dir->ino() == inodeno_t(0x10000000002))
if (pop_sp > 0) {
- dout(20) << "hit_dir " << type << " pop " << dir_pop << " spread " << pop_sp
+ dout(20) << type << " pop " << dir_pop << " spread " << pop_sp
<< " " << dir->pop_spread.last[0]
<< " " << dir->pop_spread.last[1]
<< " " << dir->pop_spread.last[2]
}
}
-int MDBalancer::dump_loads(Formatter *f)
+int MDBalancer::dump_loads(Formatter *f) const
{
- list<CDir*> dfs;
+ std::deque<CDir*> dfs;
if (mds->mdcache->get_root()) {
mds->mdcache->get_root()->get_dirfrags(dfs);
} else {
- dout(5) << "dump_load no root" << dendl;
+ dout(10) << "no root" << dendl;
}
f->open_object_section("loads");
if (!in || !in->is_dir())
continue;
- list<CDir*> ls;
- in->get_dirfrags(ls);
- for (auto subdir : ls) {
+ auto&& ls = in->get_dirfrags();
+ for (const auto& subdir : ls) {
if (subdir->pop_nested.meta_load() < .001)
continue;
dfs.push_back(subdir);